Skip to main content

pond/adapter/
claude_code.rs

1//! Claude Code CLI adapter.
2//!
3//! Source path: `~/.claude/projects/<encoded-project-path>/<session-uuid>.jsonl`.
4//! Each `.jsonl` file is one session; lines are typed entries linked via a
5//! `parentUuid` -> `uuid` chain. Tool results arrive as `user` entries whose
6//! `message.content[]` contains `tool_result` blocks with a parallel
7//! `toolUseResult` field carrying structured data.
8
9use std::{
10    collections::{HashMap, HashSet},
11    path::{Path, PathBuf},
12};
13
14use anyhow::Context as _;
15use chrono::{DateTime, SecondsFormat, Utc};
16use serde_json::{Value, json};
17
18use crate::{
19    sessions::IngestEvent,
20    wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
21};
22
23use super::{
24    Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
25    RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
26    empty_options,
27    extract::{
28        Extracted, Source, extract_compact_repr, extract_raw_record, extract_self_str, extract_str,
29    },
30    extracted_text,
31    jsonl::{BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events, source_line},
32    jsonl_bytes, part_id, part_ordinal, raw_record,
33};
34
35/// Per-file streaming state that persists across rows of one JSONL file.
36/// Lives inside [`Adapter::events`]'s per-file loop and is reset whenever
37/// the loop advances to the next file.
38///
39/// Two responsibilities:
40///
41/// 1. **Replay dedup.** Claude Code's `/resume` and `/compact` paths
42///    occasionally re-emit byte-identical rows with the same `uuid` (the
43///    stale-`messageSet`-cache bug in claude-code, see
44///    `utils/sessionStorage.ts`). The adapter dedupes here so the validator
45///    never sees the same PK twice; validator HashSet stays a safety net.
46///
47/// 2. **`tool_use_id -> tool name` resolution.** The raw `tool_result` row
48///    carries only `tool_use_id`, not the tool name; the name lives on the
49///    prior `tool_use` row in the same file. We populate this map when we
50///    see a `tool_use` part, then look it up when we see the matching
51///    `tool_result` part. Misses (e.g. compaction pruned the tool_use)
52///    surface as `name: None` in `PartKind::ToolResult` rather than the
53///    old `"unknown"` sentinel - faithful to the source rather than
54///    inventing a value.
55#[derive(Debug, Default)]
56pub(crate) struct FileState {
57    seen_uuids: HashSet<String>,
58    tool_call_names: HashMap<String, Extracted<String>>,
59}
60
61/// Stable adapter name. Surfaces as the `[sources.claude-code]` config key,
62/// the `pond sync claude-code` CLI arg, and `Session.source_agent` on every
63/// emitted row.
64const NAME: &str = "claude-code";
65
66/// Stateless factory: opens [`ClaudeCodeAdapter`] instances from config and
67/// probes for the canonical install location under `~/.claude/projects`.
68pub struct ClaudeCodeFactory;
69
70impl AdapterFactory for ClaudeCodeFactory {
71    fn name(&self) -> &'static str {
72        NAME
73    }
74
75    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
76        Ok(Box::new(ClaudeCodeAdapter::new(config_path(NAME, config)?)))
77    }
78
79    fn probe_default(&self, env: &Env) -> Option<Value> {
80        let path = env.home.join(".claude").join("projects");
81        path.exists().then(|| json!({ "path": path }))
82    }
83
84    fn serialize(
85        &self,
86        session: &crate::sessions::SessionWithMessages,
87        fidelity: RestoreFidelity,
88    ) -> Result<Vec<RestoredFile>, AdapterError> {
89        serialize_session(session, fidelity)
90    }
91}
92
93fn serialize_session(
94    session: &crate::sessions::SessionWithMessages,
95    fidelity: RestoreFidelity,
96) -> Result<Vec<RestoredFile>, AdapterError> {
97    let mut messages = session.messages.clone();
98    if fidelity == RestoreFidelity::Native {
99        messages.sort_by(|left, right| {
100            source_line(left.message.options())
101                .cmp(&source_line(right.message.options()))
102                .then_with(|| by_timestamp_then_id(left, right))
103        });
104    } else {
105        messages.sort_by(by_timestamp_then_id);
106    }
107    // Native replays the verbatim `options.source.raw_record`; `claude_record`
108    // below is foreign-only. Replay echoes a frozen snapshot - safe only while
109    // canonical is append-only (spec.md#adapter-integrity-additive-sync).
110    let mut records = Vec::with_capacity(messages.len());
111    let mut parent_uuid = None::<String>;
112    for message in &messages {
113        if fidelity == RestoreFidelity::Native
114            && let Some(raw) = raw_record(message.message.options())
115        {
116            parent_uuid = raw
117                .get("uuid")
118                .and_then(Value::as_str)
119                .map(ToOwned::to_owned)
120                .or(parent_uuid);
121            records.push(raw);
122            continue;
123        }
124        // `claude_record` returns `None` for a dropped System message;
125        // `parent_uuid` then stays put so the chain skips over the gap.
126        let Some(record) = claude_record(session, message, parent_uuid.as_deref()) else {
127            continue;
128        };
129        parent_uuid = record
130            .get("uuid")
131            .and_then(Value::as_str)
132            .map(ToOwned::to_owned);
133        records.push(record);
134    }
135
136    let mut files = vec![RestoredFile::new(
137        claude_relative_path(session),
138        jsonl_bytes(NAME, &records)?,
139        fidelity,
140    )];
141    if session.session.parent_session_id.is_some()
142        && let Some(meta) = subagent_meta_record(session)
143    {
144        let mut meta_path = files[0].relative_path.clone();
145        meta_path.set_extension("meta.json");
146        files.push(RestoredFile::new(
147            meta_path,
148            serde_json::to_vec(&meta).map_err(|err| {
149                AdapterError::schema(
150                    NAME,
151                    &session.session.id,
152                    format!("json encode failed: {err}"),
153                )
154            })?,
155            fidelity,
156        ));
157    }
158    Ok(files)
159}
160
161fn claude_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
162    let encoded_project = session
163        .session
164        .options
165        .get("source")
166        .and_then(|source| source.get("project_dir"))
167        .and_then(Value::as_str)
168        .map(ToOwned::to_owned)
169        .unwrap_or_else(|| encode_project(&session.session.project));
170    if let Some(parent) = &session.session.parent_session_id {
171        // The child id is `<parent>/<child_suffix>`; the suffix is the file's
172        // path under `subagents/` (`agent-<hash>` flat, or
173        // `workflows/<wf-id>/agent-<hash>` nested), so stripping the parent
174        // prefix reconstructs the on-disk path verbatim.
175        let child_suffix = session
176            .session
177            .id
178            .strip_prefix(&format!("{parent}/"))
179            .unwrap_or(&session.session.id);
180        return PathBuf::from(encoded_project)
181            .join(parent)
182            .join("subagents")
183            .join(format!("{child_suffix}.jsonl"));
184    }
185    PathBuf::from(encoded_project).join(format!("{}.jsonl", session.session.id))
186}
187
188fn encode_project(project: &str) -> String {
189    project.replace(['/', '.'], "-")
190}
191
192fn subagent_meta_record(session: &crate::sessions::SessionWithMessages) -> Option<Value> {
193    // Restore the sidecar `.meta.json` verbatim from the stored copy. A
194    // subagent ingested without a meta file stored `meta: null` - nothing
195    // to write back.
196    let meta = session.session.options.get("subagent")?.get("meta")?;
197    meta.is_object().then(|| meta.clone())
198}
199
200fn claude_record(
201    session: &crate::sessions::SessionWithMessages,
202    message: &crate::sessions::MessageWithParts,
203    parent_uuid: Option<&str>,
204) -> Option<Value> {
205    // Foreign restore into Claude Code (native restore re-emits the stored
206    // `raw_record` and never reaches here). Claude Code's transcript has only
207    // `user` and `assistant` rows: a tool result is a `user` row, and there
208    // is no in-transcript system turn - a System message (a rule-3 carrier or
209    // a source's own system/developer turn) has no idiomatic home and is
210    // dropped; the content stays in canonical (spec.md#adapter-native-restore-lossless,
211    // foreign clause).
212    let row_role = match &message.message {
213        Message::System { .. } => return None,
214        Message::User { .. } | Message::Tool { .. } => "user",
215        Message::Assistant { .. } => "assistant",
216    };
217    let mut envelope = serde_json::Map::new();
218    envelope.insert("role".to_owned(), Value::String(row_role.to_owned()));
219    if row_role == "assistant" {
220        // `type:"message"` is the Anthropic Messages API object discriminator
221        // - a constant, always present on a real assistant row.
222        envelope.insert("type".to_owned(), Value::String("message".to_owned()));
223    }
224    envelope.insert(
225        "content".to_owned(),
226        Value::Array(message.parts.iter().map(claude_part).collect()),
227    );
228    Some(json!({
229        "parentUuid": parent_uuid,
230        "isSidechain": false,
231        "userType": "external",
232        "cwd": &*session.session.project,
233        "sessionId": &session.session.id,
234        "type": row_role,
235        "message": Value::Object(envelope),
236        "uuid": message.message.id(),
237        "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
238    }))
239}
240
241fn claude_part(part: &Part) -> Value {
242    match &part.kind {
243        PartKind::Text { text } => json!({"type": "text", "text": extracted_text(text)}),
244        PartKind::Reasoning { text } => {
245            json!({"type": "thinking", "thinking": extracted_text(text)})
246        }
247        PartKind::ToolCall {
248            call_id,
249            name,
250            params,
251            provider_executed,
252        } => json!({
253            "type": if *provider_executed { "server_tool_use" } else { "tool_use" },
254            "id": extracted_text(call_id),
255            "name": extracted_text(name),
256            "input": params,
257        }),
258        PartKind::ToolResult {
259            call_id,
260            is_failure,
261            result,
262            ..
263        } => json!({
264            "type": "tool_result",
265            "tool_use_id": extracted_text(call_id),
266            "is_error": is_failure,
267            "content": result,
268        }),
269        PartKind::File {
270            media_type,
271            file_name,
272            data,
273        } => json!({
274            "type": "file",
275            "media_type": media_type,
276            "file_name": file_name,
277            "source": file_source(data),
278        }),
279        other => {
280            json!({"type": "text", "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null))})
281        }
282    }
283}
284
285fn file_source(data: &FileData) -> Value {
286    match data {
287        FileData::String(value) => json!({"type": "text", "data": value}),
288        FileData::Bytes(value) => json!({"type": "base64", "data": value}),
289        FileData::Url(value) => json!({"type": "url", "url": value}),
290    }
291}
292
293/// Configured claude-code reader. Walks a tree of `*.jsonl` files under
294/// [`Self::root`] and yields canonical events in source order per session.
295#[derive(Debug, Clone)]
296pub struct ClaudeCodeAdapter {
297    root: PathBuf,
298}
299
300impl ClaudeCodeAdapter {
301    pub fn new(root: impl Into<PathBuf>) -> Self {
302        Self { root: root.into() }
303    }
304}
305
306impl Adapter for ClaudeCodeAdapter {
307    fn discover(&self) -> DiscoverFuture<'_> {
308        jsonl_tree_discover(self)
309    }
310
311    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
312        jsonl_tree_events(self, oracle)
313    }
314}
315
316impl JsonlTree for ClaudeCodeAdapter {
317    type State = FileState;
318
319    fn name(&self) -> &'static str {
320        NAME
321    }
322
323    fn root(&self) -> &Path {
324        &self.root
325    }
326
327    fn peek_session_id(&self, path: &Path, first_line: &str) -> Option<String> {
328        // A file under `subagents/` takes its id from the path, never from the
329        // row's content `sessionId` (that's the parent's). A recognized child
330        // peeks to its child id; an unrecognized one returns `None` so it stays
331        // out of the freshness gate and its `unsupported_reason` failure
332        // re-surfaces on every sync rather than being skipped as `Fresh` under
333        // the parent's borrowed watermark. See spec.md#datasets.
334        if subagents_dir(path).is_some() {
335            let (parent_uuid, child_suffix, _) = subagent_ids(path)?;
336            return Some(format!("{parent_uuid}/{child_suffix}"));
337        }
338        let row: Value = serde_json::from_str(first_line).ok()?;
339        row.get("sessionId")?.as_str().map(ToOwned::to_owned)
340    }
341
342    fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
343        session_from_rows(path, rows)
344    }
345
346    fn events_from_row(
347        &self,
348        session: &Session,
349        row: &BoundedRow,
350        state: &mut Self::State,
351    ) -> Result<Vec<IngestEvent>, String> {
352        if let Some(uuid) = row.value.get("uuid").and_then(Value::as_str)
353            && !state.seen_uuids.insert(uuid.to_owned())
354        {
355            return Ok(Vec::new());
356        }
357        capture_tool_call_names(&row.value, &mut state.tool_call_names);
358        events_from_row(&session.id, row.line, &row.value, session.created_at, state)
359    }
360
361    fn unsupported_reason(&self, path: &Path) -> Option<String> {
362        // A `.jsonl` under a `subagents/` ancestor that we can't resolve to a
363        // child id (its leaf isn't `agent-<hash>.jsonl`) must NOT fall back to
364        // its content `sessionId` - that id is the parent's, so it would
365        // silently merge into the parent session. Fail visibly and wait for an
366        // adapter update instead - EXCEPT the Workflow runner's `journal.jsonl`,
367        // a known control file that carries no `sessionId`, so it falls through
368        // to `session()` and is dropped as a benign Empty skip rather than
369        // flagged as an unrecognized transcript layout. See spec.md#datasets.
370        if subagents_dir(path).is_some()
371            && subagent_ids(path).is_none()
372            && !is_workflow_control_file(path)
373        {
374            return Some(format!(
375                "{}: subagent transcript layout not recognized by this pond version; \
376                 skipped so it is not merged into the parent session - update pond and \
377                 re-run `pond sync`",
378                path.display()
379            ));
380        }
381        None
382    }
383}
384
385/// The Workflow runner writes `journal.jsonl` (its resume/cache journal of agent
386/// `started`/`result` events) beside the `agent-<hash>.jsonl` transcripts under
387/// `subagents/workflows/<wf-id>/`. It carries no `sessionId` and only duplicates
388/// content already in those transcripts, so it is a control file to ignore (a
389/// benign Empty skip), not an unrecognized transcript layout. See spec.md#datasets.
390fn is_workflow_control_file(path: &Path) -> bool {
391    subagents_dir(path).is_some()
392        && path.file_name().and_then(|n| n.to_str()) == Some("journal.jsonl")
393}
394
395/// Walk one raw row's `message.content[]` array (if any) and stash every
396/// `tool_use` part's `id -> name` mapping into the per-file map. Idempotent
397/// and safe to call on every row regardless of role; non-assistant rows
398/// just don't contribute entries.
399fn capture_tool_call_names(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
400    let Some(items) = row
401        .get("message")
402        .and_then(|message| message.get("content"))
403        .and_then(Value::as_array)
404    else {
405        return;
406    };
407    for item in items {
408        let kind = item.get("type").and_then(Value::as_str);
409        if !matches!(kind, Some("tool_use") | Some("server_tool_use")) {
410            continue;
411        }
412        let (Some(id), Some(name)) = (item.str_field("id"), extract_str(item, "name")) else {
413            continue;
414        };
415        map.insert(id.to_owned(), name);
416    }
417}
418
419fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
420    let path_display = path.display().to_string();
421    // A non-agent leaf under `subagents/` (e.g. the Workflow runner's
422    // journal.jsonl) would borrow the parent's content `sessionId` and silently
423    // merge; refuse structurally rather than rely on the row lacking one.
424    // spec.md#datasets.
425    if subagents_dir(path).is_some() && subagent_ids(path).is_none() {
426        return Err(AdapterError::schema(
427            NAME,
428            path_display,
429            "sidecar/control file under subagents/ has no session of its own",
430        ));
431    }
432    let mut created_at = None;
433    let mut project: Option<Extracted<String>> = None;
434    let mut version = None;
435    for row in rows {
436        if created_at.is_none() {
437            created_at = parse_timestamp(&row.value).ok();
438        }
439        if project.is_none() {
440            project = extract_str(&row.value, "cwd");
441        }
442        if version.is_none() {
443            version = row
444                .value
445                .get("version")
446                .and_then(Value::as_str)
447                .map(ToOwned::to_owned);
448        }
449    }
450
451    let first = rows
452        .first()
453        .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
454    let at_first = format!("{path_display}:{}", first.line);
455    let raw_session_id = first
456        .value
457        .get("sessionId")
458        .and_then(Value::as_str)
459        .ok_or_else(|| {
460            AdapterError::schema(
461                NAME,
462                at_first.clone(),
463                format!("line {} missing sessionId", first.line),
464            )
465        })?
466        .to_owned();
467    let created_at = created_at.ok_or_else(|| {
468        AdapterError::schema(NAME, at_first, "session has no parseable timestamp")
469    })?;
470
471    // Subagent detection. Claude Code stores each subagent's transcript under
472    // the session's `subagents/` sidecar - either flat
473    // (`<parent_dir>/<parent_uuid>/subagents/agent-<hash>.jsonl`) or, for the
474    // workflow runner, nested
475    // (`.../subagents/workflows/<wf-id>/agent-<hash>.jsonl`) - with a sibling
476    // `agent-<hash>.meta.json` carrying `{agentType, description}`. Every such
477    // file shares the parent's `sessionId` in row content, so ingesting it under
478    // that id collides with the parent (the validator's "project is immutable"
479    // rule rejects a cwd-shifted one, and a same-cwd one silently merges). The
480    // fix is to derive a child id from the path - keyed off the `subagents/`
481    // ancestor at any depth - and link back via `parent_session_id`. See
482    // spec.md#datasets.
483    let subagent = subagent_descriptor(path);
484    let project_dir = source_project_dir(path, subagent.is_some());
485    let (session_id, parent_session_id, source_agent, subagent_options) = match subagent {
486        Some(SubagentDescriptor {
487            parent_uuid,
488            child_suffix,
489            agent_hash,
490            agent_type,
491            meta,
492        }) => {
493            let child_id = format!("{parent_uuid}/{child_suffix}");
494            let agent_label = agent_type
495                .as_deref()
496                .map(|t| format!("claude-code/{t}"))
497                .unwrap_or_else(|| "claude-code/subagent".to_owned());
498            // `meta` is the verbatim `.meta.json`; `hash` and `raw_session_id`
499            // are pond-derived (filename hash + parent sessionId). Storing the
500            // whole meta keeps native restore of the sidecar lossless.
501            let metadata = json!({
502                "hash": agent_hash,
503                "raw_session_id": raw_session_id,
504                "meta": meta,
505            });
506            (child_id, Some(parent_uuid), agent_label, Some(metadata))
507        }
508        None => (raw_session_id, None, "claude-code".to_owned(), None),
509    };
510
511    let project = match project {
512        Some(value) => value,
513        None => {
514            let decoded = path
515                .parent()
516                .and_then(|p| p.file_name())
517                .and_then(|n| n.to_str())
518                .map(|s| s.replace('-', "/"))
519                .ok_or_else(|| {
520                    AdapterError::schema(
521                        NAME,
522                        path_display.clone(),
523                        "no `cwd` field in any row and source path is not UTF-8",
524                    )
525                })?;
526            extract_self_str(&Value::String(decoded)).ok_or_else(|| {
527                AdapterError::schema(
528                    NAME,
529                    path_display.clone(),
530                    "internal: Value::String produced None from Source::as_str",
531                )
532            })?
533        }
534    };
535
536    let mut options = ProviderOptions::new();
537    options.insert(
538        "source".to_owned(),
539        json!({
540            "adapter": "claude-code",
541            "version": version,
542            "project_dir": project_dir,
543            "workspace_path": &*project,
544        }),
545    );
546    if let Some(metadata) = subagent_options {
547        options.insert("subagent".to_owned(), metadata);
548    }
549
550    Ok(Session {
551        id: session_id,
552        parent_session_id,
553        parent_message_id: None,
554        source_agent,
555        created_at,
556        project,
557        options,
558    })
559}
560
561fn source_project_dir(path: &Path, is_subagent: bool) -> Option<String> {
562    // The project dir is the grandparent of `subagents/` regardless of how
563    // deeply the transcript nests below it (`.../<project>/<parent_uuid>/
564    // subagents/...`), so climb from the `subagents/` ancestor rather than a
565    // fixed number of `.parent()` hops.
566    let project_dir = if is_subagent {
567        subagents_dir(path)?.parent()?.parent()
568    } else {
569        path.parent()
570    };
571    project_dir
572        .and_then(|p| p.file_name())
573        .and_then(|n| n.to_str())
574        .map(ToOwned::to_owned)
575}
576
577/// The `subagents/` directory in `path`'s ancestry, if any. Depth-independent:
578/// matches both the flat `<parent_uuid>/subagents/agent-<hash>.jsonl` and the
579/// nested workflow `<parent_uuid>/subagents/workflows/<wf-id>/agent-<hash>.jsonl`
580/// layouts. The directory directly above it is the parent session uuid.
581fn subagents_dir(path: &Path) -> Option<&Path> {
582    let mut cur = path.parent();
583    while let Some(dir) = cur {
584        if dir.file_name().and_then(|n| n.to_str()) == Some("subagents") {
585            return Some(dir);
586        }
587        cur = dir.parent();
588    }
589    None
590}
591
592/// Resolved metadata for one subagent JSONL file. `agent_type` is read from
593/// the sibling `.meta.json` for the `source_agent` label; `meta` keeps that
594/// file's full verbatim content so native restore reproduces it
595/// (spec.md#adapter-native-restore-lossless). Both are `None` when the meta file is
596/// absent or unreadable (the label falls back to `claude-code/subagent`).
597struct SubagentDescriptor {
598    parent_uuid: String,
599    child_suffix: String,
600    agent_hash: String,
601    agent_type: Option<String>,
602    meta: Option<Value>,
603}
604
605/// `(parent_uuid, child_suffix, agent_hash)` for a subagent transcript, or
606/// `None` for any path without a `subagents/` ancestor or a non-`agent-<hash>`
607/// leaf (the common case: top-level session files). `child_suffix` is the file's
608/// path relative to its `subagents/` ancestor with `.jsonl` stripped -
609/// `agent-<hash>` flat, `workflows/<wf-id>/agent-<hash>` nested - so the derived
610/// child id `<parent_uuid>/<child_suffix>` round-trips back to the on-disk path
611/// on native restore. `agent_hash` keys the sibling `.meta.json` lookup.
612fn subagent_ids(path: &Path) -> Option<(String, String, String)> {
613    let file_name = path.file_name()?.to_str()?;
614    let agent_hash = file_name
615        .strip_prefix("agent-")?
616        .strip_suffix(".jsonl")?
617        .to_owned();
618    let subagents = subagents_dir(path)?;
619    let parent_uuid = subagents.parent()?.file_name()?.to_str()?.to_owned();
620    // The child id must be `/`-canonical on every platform (the rest of the
621    // adapter and `claude_relative_path` assume `/`), but a relative path carries
622    // the OS separator - normalize it. No-op on POSIX.
623    let child_suffix = path
624        .strip_prefix(subagents)
625        .ok()?
626        .with_extension("")
627        .to_str()?
628        .replace(std::path::MAIN_SEPARATOR, "/");
629    Some((parent_uuid, child_suffix, agent_hash))
630}
631
632/// [`subagent_ids`] plus the sibling `agent-<hash>.meta.json` - `agentType` for
633/// the `source_agent` label, the whole file for lossless sidecar restore.
634fn subagent_descriptor(path: &Path) -> Option<SubagentDescriptor> {
635    let (parent_uuid, child_suffix, agent_hash) = subagent_ids(path)?;
636    let meta_path = path.parent()?.join(format!("agent-{agent_hash}.meta.json"));
637    let (agent_type, meta) = match std::fs::read(&meta_path) {
638        Ok(bytes) => match serde_json::from_slice::<Value>(&bytes) {
639            Ok(value) => (
640                value
641                    .get("agentType")
642                    .and_then(Value::as_str)
643                    .map(ToOwned::to_owned),
644                Some(value),
645            ),
646            Err(error) => {
647                tracing::debug!(
648                    target: "pond::adapter::claude_code",
649                    meta = %meta_path.display(),
650                    %error,
651                    "subagent .meta.json present but unparseable; falling back to 'claude-code/subagent'",
652                );
653                (None, None)
654            }
655        },
656        Err(error) if error.kind() == std::io::ErrorKind::NotFound => (None, None),
657        Err(error) => {
658            tracing::debug!(
659                target: "pond::adapter::claude_code",
660                meta = %meta_path.display(),
661                %error,
662                "subagent .meta.json IO error; falling back to 'claude-code/subagent'",
663            );
664            (None, None)
665        }
666    };
667
668    Some(SubagentDescriptor {
669        parent_uuid,
670        child_suffix,
671        agent_hash,
672        agent_type,
673        meta,
674    })
675}
676
677fn events_from_row(
678    session_id: &str,
679    line: usize,
680    row: &Value,
681    default_timestamp: DateTime<Utc>,
682    state: &FileState,
683) -> Result<Vec<IngestEvent>, String> {
684    let timestamp = parse_timestamp(row).unwrap_or(default_timestamp);
685    let uuid = row
686        .get("uuid")
687        .and_then(Value::as_str)
688        .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
689
690    if let Some(message_value) = row.get("message") {
691        return message_events(
692            session_id,
693            &uuid,
694            timestamp,
695            row,
696            message_value,
697            state,
698            line,
699        );
700    }
701
702    // Rows with no `message` field are session-metadata records:
703    // `queue-operation`, `permission-mode`, `last-prompt`, `attachment`,
704    // `progress`, `system`, `custom-title`, etc. We preserve them as
705    // System messages with the row's compact JSON in `content` so a future
706    // exporter could reconstruct the original transcript; the `subtype`
707    // becomes the human label via `options.source.raw_type`.
708    let raw_type = row.get("type").and_then(Value::as_str);
709    let content = if raw_type == Some("attachment") {
710        row.get("attachment")
711            .and_then(attachment_content)
712            .or_else(|| Some(extract_compact_repr(row)))
713    } else {
714        extract_str(row, "subtype").or_else(|| extract_str(row, "type"))
715    };
716    let message = Message::System {
717        id: uuid,
718        session_id: session_id.to_owned(),
719        timestamp,
720        content,
721        options: row_options(row, line),
722    };
723    Ok(vec![IngestEvent::Message(message)])
724}
725
726fn message_events(
727    session_id: &str,
728    uuid: &str,
729    timestamp: DateTime<Utc>,
730    row: &Value,
731    message_value: &Value,
732    state: &FileState,
733    line: usize,
734) -> Result<Vec<IngestEvent>, String> {
735    let role = message_value
736        .get("role")
737        .and_then(Value::as_str)
738        .ok_or_else(|| "message missing role".to_owned())?;
739    let content = message_value.get("content").unwrap_or(&Value::Null);
740    let mut parts = Vec::new();
741    let message = match (role, content) {
742        ("user", Value::String(text)) => {
743            // spec.md#model-part-provenance: a user-slot turn is conversation only
744            // when it is a genuine human prompt; harness-injected wrappers and
745            // `isMeta` rows are scaffolding.
746            let provenance = user_text_provenance(row, text);
747            parts.push(text_part(
748                session_id,
749                uuid,
750                0,
751                extract_self_str(content),
752                provenance,
753            ));
754            Message::User {
755                id: uuid.to_owned(),
756                session_id: session_id.to_owned(),
757                timestamp,
758                options: row_options(row, line),
759            }
760        }
761        ("user", Value::Array(items)) if items.iter().all(is_tool_result) => {
762            let source_tool_result = row.get("toolUseResult").cloned();
763            parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
764                tool_result_part(
765                    session_id,
766                    uuid,
767                    ordinal,
768                    item,
769                    source_tool_result.as_ref(),
770                    state,
771                )
772            }));
773            Message::Tool {
774                id: uuid.to_owned(),
775                session_id: session_id.to_owned(),
776                timestamp,
777                options: row_options(row, line),
778            }
779        }
780        ("user", Value::Array(items)) => {
781            // Classify the whole user message once: v1 claude-code never mixes
782            // provenance within a single message (spec.md#model-part-provenance).
783            let provenance = user_array_provenance(row, items);
784            parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
785                user_part(session_id, uuid, ordinal, item, state, provenance)
786            }));
787            Message::User {
788                id: uuid.to_owned(),
789                session_id: session_id.to_owned(),
790                timestamp,
791                options: row_options(row, line),
792            }
793        }
794        ("assistant", Value::Array(items)) => {
795            parts.extend(
796                items
797                    .iter()
798                    .enumerate()
799                    .map(|(ordinal, item)| assistant_part(session_id, uuid, ordinal, item)),
800            );
801            Message::Assistant {
802                id: uuid.to_owned(),
803                session_id: session_id.to_owned(),
804                timestamp,
805                options: assistant_options(row, message_value, line),
806            }
807        }
808        ("system", Value::String(_)) => Message::System {
809            id: uuid.to_owned(),
810            session_id: session_id.to_owned(),
811            timestamp,
812            content: extract_self_str(content),
813            options: row_options(row, line),
814        },
815        ("system", _) => Message::System {
816            id: uuid.to_owned(),
817            session_id: session_id.to_owned(),
818            timestamp,
819            // Fallback for system messages without a string content: serialize
820            // the structured body as JSON. This is not a synthesized value
821            // (the row genuinely had this content), just a lossless string
822            // encoding of structured data.
823            content: Some(extract_compact_repr(message_value)),
824            options: row_options(row, line),
825        },
826        (other, _) => {
827            return Err(format!("unsupported message role {other}"));
828        }
829    };
830
831    let mut events = Vec::with_capacity(parts.len() + 1);
832    events.push(IngestEvent::Message(message));
833    events.extend(parts.into_iter().map(IngestEvent::Part));
834    Ok(events)
835}
836
837fn text_part(
838    session_id: &str,
839    message_id: &str,
840    ordinal: usize,
841    text: Option<Extracted<String>>,
842    provenance: Provenance,
843) -> Part {
844    Part {
845        session_id: session_id.to_owned(),
846        id: part_id(message_id, ordinal),
847        message_id: message_id.to_owned(),
848        ordinal: part_ordinal(ordinal),
849        provenance,
850        options: empty_options(),
851        kind: PartKind::Text { text },
852    }
853}
854
855fn user_part(
856    session_id: &str,
857    message_id: &str,
858    ordinal: usize,
859    value: &Value,
860    state: &FileState,
861    provenance: Provenance,
862) -> Part {
863    match value.get("type").and_then(Value::as_str) {
864        Some("text") => text_part(
865            session_id,
866            message_id,
867            ordinal,
868            extract_str(value, "text"),
869            provenance,
870        ),
871        Some("image") | Some("file") => {
872            file_part(session_id, message_id, ordinal, value, provenance)
873        }
874        Some("tool_result") => {
875            tool_result_part(session_id, message_id, ordinal, value, None, state)
876        }
877        // Unknown user part shapes: preserve the raw JSON in the Text slot
878        // rather than dropping. This is not a synthesized value - it's a
879        // lossless encoding of structured data the schema doesn't model.
880        _ => text_part(
881            session_id,
882            message_id,
883            ordinal,
884            Some(extract_compact_repr(value)),
885            provenance,
886        ),
887    }
888}
889
890fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, value: &Value) -> Part {
891    // spec.md#model-part-provenance: assistant content - text, reasoning, tool calls -
892    // is model-authored, hence conversational. `tool_result` parts never appear
893    // on an assistant message.
894    match value.get("type").and_then(Value::as_str) {
895        Some("text") => text_part(
896            session_id,
897            message_id,
898            ordinal,
899            extract_str(value, "text"),
900            Provenance::Conversational,
901        ),
902        Some("thinking") => Part {
903            session_id: session_id.to_owned(),
904            id: part_id(message_id, ordinal),
905            message_id: message_id.to_owned(),
906            ordinal: part_ordinal(ordinal),
907            provenance: Provenance::Conversational,
908            options: signature_options(value),
909            kind: PartKind::Reasoning {
910                text: extract_str(value, "thinking"),
911            },
912        },
913        Some("tool_use") => Part {
914            session_id: session_id.to_owned(),
915            id: part_id(message_id, ordinal),
916            message_id: message_id.to_owned(),
917            ordinal: part_ordinal(ordinal),
918            provenance: Provenance::Conversational,
919            options: empty_options(),
920            kind: PartKind::ToolCall {
921                call_id: extract_str(value, "id"),
922                name: extract_str(value, "name"),
923                params: value.get("input").cloned().unwrap_or(Value::Null),
924                provider_executed: false,
925            },
926        },
927        Some("server_tool_use") => Part {
928            session_id: session_id.to_owned(),
929            id: part_id(message_id, ordinal),
930            message_id: message_id.to_owned(),
931            ordinal: part_ordinal(ordinal),
932            provenance: Provenance::Conversational,
933            options: empty_options(),
934            kind: PartKind::ToolCall {
935                call_id: extract_str(value, "id"),
936                name: extract_str(value, "name"),
937                params: value.get("input").cloned().unwrap_or(Value::Null),
938                provider_executed: true,
939            },
940        },
941        Some("image") | Some("file") => file_part(
942            session_id,
943            message_id,
944            ordinal,
945            value,
946            Provenance::Conversational,
947        ),
948        // Same rationale as `user_part`'s fallback: lossless encoding of
949        // an unrecognised structured shape, not synthesised data.
950        _ => text_part(
951            session_id,
952            message_id,
953            ordinal,
954            Some(extract_compact_repr(value)),
955            Provenance::Conversational,
956        ),
957    }
958}
959
960fn tool_result_part(
961    session_id: &str,
962    message_id: &str,
963    ordinal: usize,
964    value: &Value,
965    source_tool_result: Option<&Value>,
966    state: &FileState,
967) -> Part {
968    let call_id = extract_str(value, "tool_use_id");
969    // `tool_result` source rows don't carry the tool name; it's resolved
970    // via the per-file `tool_use_id -> name` map. Misses (compaction pruned
971    // the originating `tool_use`) surface as `None` per spec.md#model-no-synthesis
972    // (schema-honesty: the field is `Option<Extracted<T>>`, not a fabricated
973    // string).
974    let name = value
975        .str_field("tool_use_id")
976        .and_then(|id| state.tool_call_names.get(id))
977        .cloned();
978    let result = value
979        .get("content")
980        .cloned()
981        .or_else(|| source_tool_result.cloned())
982        .unwrap_or(Value::Null);
983    Part {
984        session_id: session_id.to_owned(),
985        id: part_id(message_id, ordinal),
986        message_id: message_id.to_owned(),
987        ordinal: part_ordinal(ordinal),
988        // spec.md#model-part-provenance: tool output is runtime-produced, not
989        // conversation.
990        provenance: Provenance::Injected,
991        options: empty_options(),
992        kind: PartKind::ToolResult {
993            call_id,
994            name,
995            is_failure: value
996                .get("is_error")
997                .and_then(Value::as_bool)
998                .unwrap_or(false),
999            result,
1000        },
1001    }
1002}
1003
1004fn file_part(
1005    session_id: &str,
1006    message_id: &str,
1007    ordinal: usize,
1008    value: &Value,
1009    provenance: Provenance,
1010) -> Part {
1011    let media_type = value
1012        .get("media_type")
1013        .or_else(|| value.get("mime_type"))
1014        .and_then(Value::as_str)
1015        .map(ToOwned::to_owned);
1016    let file_name = value
1017        .get("file_name")
1018        .or_else(|| value.get("name"))
1019        .and_then(Value::as_str)
1020        .map(ToOwned::to_owned);
1021    let data = if let Some(source) = value.get("source") {
1022        if let Some(url) = source.get("url").and_then(Value::as_str) {
1023            FileData::Url(url.to_owned())
1024        } else if let Some(bytes) = source.get("data").and_then(Value::as_str) {
1025            FileData::String(bytes.to_owned())
1026        } else {
1027            FileData::String(compact_json(source))
1028        }
1029    } else if let Some(url) = value.get("url").and_then(Value::as_str) {
1030        FileData::Url(url.to_owned())
1031    } else {
1032        FileData::String(compact_json(value))
1033    };
1034
1035    Part {
1036        session_id: session_id.to_owned(),
1037        id: part_id(message_id, ordinal),
1038        message_id: message_id.to_owned(),
1039        ordinal: part_ordinal(ordinal),
1040        provenance,
1041        options: empty_options(),
1042        kind: PartKind::File {
1043            media_type,
1044            file_name,
1045            data,
1046        },
1047    }
1048}
1049
1050fn row_options(row: &Value, line: usize) -> ProviderOptions {
1051    let mut options = ProviderOptions::new();
1052    let source = json!({
1053        "line": line,
1054        "parent_uuid": row.get("parentUuid"),
1055        "is_sidechain": row.get("isSidechain"),
1056        "user_type": row.get("userType"),
1057        "entrypoint": row.get("entrypoint"),
1058        "cwd": row.get("cwd"),
1059        "version": row.get("version"),
1060        "git_branch": row.get("gitBranch"),
1061        "request_id": row.get("requestId"),
1062        "raw_type": row.get("type"),
1063        "raw_record": extract_raw_record(row),
1064    });
1065    options.insert("source".to_owned(), source);
1066    options
1067}
1068
1069fn assistant_options(row: &Value, message_value: &Value, line: usize) -> ProviderOptions {
1070    let mut options = row_options(row, line);
1071    let anthropic = json!({
1072        "id": message_value.get("id"),
1073        "model": message_value.get("model"),
1074        "stop_reason": message_value.get("stop_reason"),
1075        "stop_sequence": message_value.get("stop_sequence"),
1076        "usage": message_value.get("usage"),
1077    });
1078    options.insert("anthropic".to_owned(), anthropic);
1079    options
1080}
1081
1082fn signature_options(value: &Value) -> ProviderOptions {
1083    let mut options = ProviderOptions::new();
1084    if let Some(signature) = value.get("signature").and_then(Value::as_str) {
1085        options.insert("anthropic".to_owned(), json!({"signature": signature}));
1086    }
1087    options
1088}
1089
1090fn attachment_content(value: &Value) -> Option<Extracted<String>> {
1091    extract_str(value, "content").or_else(|| extract_str(value, "stdout"))
1092}
1093
1094fn parse_timestamp(value: &Value) -> anyhow::Result<DateTime<Utc>> {
1095    let timestamp = value
1096        .get("timestamp")
1097        .and_then(Value::as_str)
1098        .context("missing timestamp")?;
1099    Ok(DateTime::parse_from_rfc3339(timestamp)
1100        .context("invalid timestamp")?
1101        .with_timezone(&Utc))
1102}
1103
1104fn is_tool_result(value: &Value) -> bool {
1105    value.get("type").and_then(Value::as_str) == Some("tool_result")
1106}
1107
1108/// True when the row carries `isMeta: true` - claude-code's marker for an
1109/// expanded skill or command body injected into a user slot.
1110fn is_meta_row(row: &Value) -> bool {
1111    row.get("isMeta").and_then(Value::as_bool) == Some(true)
1112}
1113
1114/// Harness-injected wrappers claude-code places inside a user-slot turn
1115/// (spec.md#model-part-provenance): task notifications, slash-command echoes,
1116/// local-command caveats, interrupt notices.
1117fn is_injected_user_text(text: &str) -> bool {
1118    let trimmed = text.trim_start();
1119    trimmed.starts_with("<task-notification>")
1120        || trimmed.starts_with("<command-name>")
1121        || trimmed.starts_with("<command-message>")
1122        || trimmed.starts_with("<command-args>")
1123        || trimmed.starts_with("<local-command-caveat>")
1124        || trimmed.starts_with("<local-command-stdout>")
1125        || trimmed.starts_with("[Request interrupted by user")
1126}
1127
1128/// Provenance of a string-content user message: `injected` for an `isMeta`
1129/// row or a harness wrapper, `conversational` for a genuine human prompt.
1130fn user_text_provenance(row: &Value, text: &str) -> Provenance {
1131    if is_meta_row(row) || is_injected_user_text(text) {
1132        Provenance::Injected
1133    } else {
1134        Provenance::Conversational
1135    }
1136}
1137
1138/// Provenance of an array-content user message. `isMeta` flags the whole row;
1139/// otherwise a leading text item carrying a harness wrapper marks it injected.
1140/// v1 claude-code never interleaves both within one message.
1141fn user_array_provenance(row: &Value, items: &[Value]) -> Provenance {
1142    if is_meta_row(row) {
1143        return Provenance::Injected;
1144    }
1145    let wrapped = items.iter().any(|item| {
1146        item.get("type").and_then(Value::as_str) == Some("text")
1147            && item
1148                .get("text")
1149                .and_then(Value::as_str)
1150                .is_some_and(is_injected_user_text)
1151    });
1152    if wrapped {
1153        Provenance::Injected
1154    } else {
1155        Provenance::Conversational
1156    }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161    //! Conformance tests for the claude-code adapter's data-shape contract:
1162    //! subagent path derivation, replay dedup, tool-name resolution, and the
1163    //! "no synthesized values" invariant (spec.md#model-no-synthesis, spec.md#model-schema-honesty, and spec.md#model-lossless-projection).
1164    //!
1165    //! Each test builds a tiny synthetic corpus under a `TempDir` so the
1166    //! assertions exercise the real adapter end-to-end without depending on
1167    //! committed fixtures.
1168    #![allow(clippy::expect_used, clippy::unwrap_used)]
1169
1170    use super::*;
1171    use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
1172    use tempfile::TempDir;
1173
1174    const FIXTURE_ROOT: &str = "tests/fixtures/adapter/claude_code/projects";
1175
1176    #[test]
1177    fn probe_default_finds_claude_projects_under_home() -> anyhow::Result<()> {
1178        crate::adapter::test_support::assert_probe_default(
1179            &ClaudeCodeFactory,
1180            &[".claude", "projects"],
1181        )
1182    }
1183
1184    #[tokio::test(flavor = "multi_thread")]
1185    async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
1186        let adapter = ClaudeCodeAdapter::new(FIXTURE_ROOT);
1187        crate::adapter::test_support::assert_native_restore(
1188            &ClaudeCodeFactory,
1189            &adapter,
1190            std::path::Path::new(FIXTURE_ROOT),
1191        )
1192        .await
1193    }
1194
1195    /// `<root>/<encoded-cwd>/<parent_uuid>.jsonl` plus
1196    /// `<root>/<encoded-cwd>/<parent_uuid>/subagents/agent-<hash>.jsonl` plus
1197    /// `agent-<hash>.meta.json`. The subagent file must:
1198    ///   - emit a Session whose `id = "{parent_uuid}/agent-{hash}"`
1199    ///   - have `parent_session_id = Some(parent_uuid)`
1200    ///   - have `source_agent = "claude-code/{agentType}"` from the meta file
1201    ///   - have `options.subagent` carrying the hash + agent_type + description
1202    #[tokio::test(flavor = "multi_thread")]
1203    async fn subagent_file_derives_child_session_with_parent_link() -> anyhow::Result<()> {
1204        let corpus = TempDir::new()?;
1205        let project_dir = corpus.path().join("-tmp-pond-test");
1206        let parent_uuid = "11111111-1111-1111-1111-111111111111";
1207        let agent_hash = "abc123def456";
1208        std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1209
1210        // Parent session file (one user row to anchor a Session).
1211        let parent_row = serde_json::json!({
1212            "type": "user",
1213            "uuid": "u-parent-1",
1214            "sessionId": parent_uuid,
1215            "cwd": "/tmp/pond-test",
1216            "timestamp": "2026-05-16T00:00:00.000Z",
1217            "version": "2.1.121",
1218            "message": {"role": "user", "content": "hi parent"},
1219        });
1220        std::fs::write(
1221            project_dir.join(format!("{parent_uuid}.jsonl")),
1222            format!("{parent_row}\n"),
1223        )?;
1224
1225        // Subagent file + sibling meta. Carries the SAME sessionId as the parent
1226        // in row content; the adapter must derive a child id from the path.
1227        let subagent_row = serde_json::json!({
1228            "type": "user",
1229            "uuid": "u-sub-1",
1230            "sessionId": parent_uuid,
1231            "cwd": "/tmp/pond-test",
1232            "isSidechain": true,
1233            "agentId": agent_hash,
1234            "timestamp": "2026-05-16T00:01:00.000Z",
1235            "version": "2.1.121",
1236            "message": {"role": "user", "content": "subagent prompt"},
1237        });
1238        std::fs::write(
1239            project_dir
1240                .join(parent_uuid)
1241                .join("subagents")
1242                .join(format!("agent-{agent_hash}.jsonl")),
1243            format!("{subagent_row}\n"),
1244        )?;
1245        std::fs::write(
1246            project_dir
1247                .join(parent_uuid)
1248                .join("subagents")
1249                .join(format!("agent-{agent_hash}.meta.json")),
1250            r#"{"agentType":"general-purpose","description":"do a thing"}"#,
1251        )?;
1252
1253        let store_dir = TempDir::new()?;
1254        let store = Store::open_local(store_dir.path()).await?;
1255        let adapter = ClaudeCodeAdapter::new(corpus.path());
1256
1257        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1258        assert_eq!(
1259            summary.dropped_sessions, 0,
1260            "subagent file must NOT collide with parent (pre-fix this was the project-immutable rejection)"
1261        );
1262
1263        let parent = store
1264            .get_session(parent_uuid)
1265            .await?
1266            .expect("parent session should ingest as the bare uuid");
1267        assert_eq!(parent.session.source_agent, "claude-code");
1268        assert_eq!(parent.session.parent_session_id, None);
1269
1270        let child_id = format!("{parent_uuid}/agent-{agent_hash}");
1271        let child = store
1272            .get_session(&child_id)
1273            .await?
1274            .expect("subagent session must surface under the derived id");
1275        assert_eq!(
1276            child.session.source_agent, "claude-code/general-purpose",
1277            "agent_type from .meta.json should suffix the source_agent label"
1278        );
1279        assert_eq!(
1280            child.session.parent_session_id.as_deref(),
1281            Some(parent_uuid),
1282            "subagent must link back to parent via parent_session_id",
1283        );
1284        let subagent_meta = child
1285            .session
1286            .options
1287            .get("subagent")
1288            .expect("options.subagent must carry the hash + verbatim meta.json");
1289        assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1290        assert_eq!(
1291            subagent_meta["meta"]["agentType"],
1292            serde_json::json!("general-purpose")
1293        );
1294        assert_eq!(
1295            subagent_meta["meta"]["description"],
1296            serde_json::json!("do a thing")
1297        );
1298        Ok(())
1299    }
1300
1301    /// Subagent file present but the sibling `.meta.json` is missing. The
1302    /// adapter must still derive a child session (so it doesn't collide with
1303    /// the parent) and fall back to `source_agent = "claude-code/subagent"`.
1304    #[tokio::test(flavor = "multi_thread")]
1305    async fn subagent_without_meta_falls_back_to_generic_label() -> anyhow::Result<()> {
1306        let corpus = TempDir::new()?;
1307        let project_dir = corpus.path().join("-tmp-pond-test");
1308        let parent_uuid = "22222222-2222-2222-2222-222222222222";
1309        let agent_hash = "deadbeef";
1310        std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1311        let row = serde_json::json!({
1312            "type": "user",
1313            "uuid": "u-sub-only",
1314            "sessionId": parent_uuid,
1315            "cwd": "/tmp/pond-test",
1316            "timestamp": "2026-05-16T00:00:00.000Z",
1317            "message": {"role": "user", "content": "no meta sibling here"},
1318        });
1319        std::fs::write(
1320            project_dir
1321                .join(parent_uuid)
1322                .join("subagents")
1323                .join(format!("agent-{agent_hash}.jsonl")),
1324            format!("{row}\n"),
1325        )?;
1326
1327        let store_dir = TempDir::new()?;
1328        let store = Store::open_local(store_dir.path()).await?;
1329        let adapter = ClaudeCodeAdapter::new(corpus.path());
1330        let _summary =
1331            ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1332
1333        let child = store
1334            .get_session(&format!("{parent_uuid}/agent-{agent_hash}"))
1335            .await?
1336            .expect("derived child id even without meta");
1337        assert_eq!(child.session.source_agent, "claude-code/subagent");
1338        Ok(())
1339    }
1340
1341    /// Nested workflow-runner subagent:
1342    ///   `<parent_uuid>/subagents/workflows/<wf-id>/agent-<hash>.jsonl`.
1343    /// Same parent `sessionId` in row content AND a shifted `cwd`. The adapter
1344    /// must derive a distinct child id from the FULL path under `subagents/`
1345    /// (not collapse onto the parent), so it neither collides on the immutable
1346    /// `project` nor silently merges into the parent. Regression for the
1347    /// workflow-layout sync rejection. See spec.md#datasets.
1348    #[tokio::test(flavor = "multi_thread")]
1349    async fn workflow_nested_subagent_derives_distinct_child_not_parent_collision()
1350    -> anyhow::Result<()> {
1351        let corpus = TempDir::new()?;
1352        let project_dir = corpus.path().join("-tmp-pond-test");
1353        let parent_uuid = "44444444-4444-4444-4444-444444444444";
1354        let wf_id = "wf_abcd1234-ef0";
1355        let agent_hash = "cafef00dbaadf00d1";
1356        let wf_dir = project_dir
1357            .join(parent_uuid)
1358            .join("subagents")
1359            .join("workflows")
1360            .join(wf_id);
1361        std::fs::create_dir_all(&wf_dir)?;
1362
1363        let parent_row = serde_json::json!({
1364            "type": "user",
1365            "uuid": "u-parent-1",
1366            "sessionId": parent_uuid,
1367            "cwd": "/tmp/pond-test",
1368            "timestamp": "2026-05-20T00:00:00.000Z",
1369            "message": {"role": "user", "content": "hi parent"},
1370        });
1371        std::fs::write(
1372            project_dir.join(format!("{parent_uuid}.jsonl")),
1373            format!("{parent_row}\n"),
1374        )?;
1375
1376        // Shifted cwd: pre-fix this collided with the parent's immutable project.
1377        let subagent_row = serde_json::json!({
1378            "type": "user",
1379            "uuid": "u-wf-sub-1",
1380            "sessionId": parent_uuid,
1381            "cwd": "/tmp/pond-test/packages/sub",
1382            "isSidechain": true,
1383            "agentId": agent_hash,
1384            "timestamp": "2026-05-20T00:01:00.000Z",
1385            "message": {"role": "user", "content": "workflow subagent prompt"},
1386        });
1387        std::fs::write(
1388            wf_dir.join(format!("agent-{agent_hash}.jsonl")),
1389            format!("{subagent_row}\n"),
1390        )?;
1391        std::fs::write(
1392            wf_dir.join(format!("agent-{agent_hash}.meta.json")),
1393            r#"{"agentType":"general-purpose","description":"workflow child"}"#,
1394        )?;
1395
1396        let store_dir = TempDir::new()?;
1397        let store = Store::open_local(store_dir.path()).await?;
1398        let adapter = ClaudeCodeAdapter::new(corpus.path());
1399        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1400        assert_eq!(
1401            summary.dropped_sessions, 0,
1402            "nested workflow subagent must NOT collide with the parent project",
1403        );
1404
1405        let parent = store
1406            .get_session(parent_uuid)
1407            .await?
1408            .expect("parent session ingests under the bare uuid");
1409        assert_eq!(&*parent.session.project, "/tmp/pond-test");
1410        assert_eq!(parent.session.parent_session_id, None);
1411
1412        let child_id = format!("{parent_uuid}/workflows/{wf_id}/agent-{agent_hash}");
1413        let child = store
1414            .get_session(&child_id)
1415            .await?
1416            .expect("workflow subagent surfaces under the full nested child id");
1417        assert_eq!(child.session.source_agent, "claude-code/general-purpose");
1418        assert_eq!(
1419            child.session.parent_session_id.as_deref(),
1420            Some(parent_uuid)
1421        );
1422        assert_eq!(
1423            &*child.session.project, "/tmp/pond-test/packages/sub",
1424            "child keeps its own cwd-derived project, distinct from the parent",
1425        );
1426        let subagent_meta = child
1427            .session
1428            .options
1429            .get("subagent")
1430            .expect("options.subagent present");
1431        assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1432        Ok(())
1433    }
1434
1435    /// A `.jsonl` under `subagents/` whose leaf is NOT `agent-<hash>.jsonl` (a
1436    /// layout this pond version doesn't understand) must FAIL VISIBLY rather
1437    /// than fall back to its content `sessionId` (the parent's) and silently
1438    /// merge into the parent session. It is counted as an unsupported skip and
1439    /// contributes no rows. See spec.md#datasets.
1440    #[tokio::test(flavor = "multi_thread")]
1441    async fn unrecognized_subagents_file_fails_visibly_not_merged() -> anyhow::Result<()> {
1442        let corpus = TempDir::new()?;
1443        let project_dir = corpus.path().join("-tmp-pond-test");
1444        let parent_uuid = "55555555-5555-5555-5555-555555555555";
1445        let unknown_dir = project_dir
1446            .join(parent_uuid)
1447            .join("subagents")
1448            .join("workflows")
1449            .join("wf_future01-aaa");
1450        std::fs::create_dir_all(&unknown_dir)?;
1451
1452        let parent_row = serde_json::json!({
1453            "type": "user",
1454            "uuid": "u-parent-only",
1455            "sessionId": parent_uuid,
1456            "cwd": "/tmp/pond-test",
1457            "timestamp": "2026-05-20T00:00:00.000Z",
1458            "message": {"role": "user", "content": "parent message"},
1459        });
1460        std::fs::write(
1461            project_dir.join(format!("{parent_uuid}.jsonl")),
1462            format!("{parent_row}\n"),
1463        )?;
1464
1465        // Same parent sessionId AND same cwd: pre-guard this would have merged
1466        // silently into the parent. The leaf name is not `agent-<hash>.jsonl`.
1467        let unknown_row = serde_json::json!({
1468            "type": "user",
1469            "uuid": "u-should-not-merge",
1470            "sessionId": parent_uuid,
1471            "cwd": "/tmp/pond-test",
1472            "timestamp": "2026-05-20T00:02:00.000Z",
1473            "message": {"role": "user", "content": "must not land under parent"},
1474        });
1475        std::fs::write(
1476            unknown_dir.join("transcript-001.jsonl"),
1477            format!("{unknown_row}\n"),
1478        )?;
1479
1480        let store_dir = TempDir::new()?;
1481        let store = Store::open_local(store_dir.path()).await?;
1482        let adapter = ClaudeCodeAdapter::new(corpus.path());
1483        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1484
1485        assert_eq!(
1486            summary.skipped_files, 1,
1487            "the unrecognized subagents/ transcript must be a visible, counted skip",
1488        );
1489        let parent = store
1490            .get_session(parent_uuid)
1491            .await?
1492            .expect("parent session ingests");
1493        assert_eq!(
1494            parent.messages.len(),
1495            1,
1496            "the unrecognized file's row must NOT be merged into the parent session",
1497        );
1498        assert!(
1499            parent
1500                .messages
1501                .iter()
1502                .all(|m| m.message.id() != "u-should-not-merge"),
1503            "parent must not absorb the unrecognized file's message",
1504        );
1505        Ok(())
1506    }
1507
1508    /// Re-sync visibility: an unrecognized `subagents/` file must STILL surface as
1509    /// a visible `Unsupported` skip when the parent already carries a freshness
1510    /// watermark. Its content `sessionId` is the parent's, so peeking it would let
1511    /// the freshness gate skip the file as `Fresh` under the parent's watermark and
1512    /// hide the failure. `peek_session_id` returns `None` for it instead, keeping
1513    /// it out of the gate. Regression for the re-sync visibility leak. See
1514    /// spec.md#datasets.
1515    #[tokio::test(flavor = "multi_thread")]
1516    async fn unrecognized_subagents_file_stays_visible_under_parent_watermark() -> anyhow::Result<()>
1517    {
1518        struct ParentAlreadyFresh;
1519        impl crate::adapter::SkipOracle for ParentAlreadyFresh {
1520            fn last_ingested_at(&self, _session_id: &str) -> Option<DateTime<Utc>> {
1521                // Far-future watermark: every source mtime is `<= ingested`, so a
1522                // peeked id WOULD trip the freshness gate. The guard must keep the
1523                // unrecognized file out of the gate regardless.
1524                Some(
1525                    DateTime::parse_from_rfc3339("2999-01-01T00:00:00Z")
1526                        .unwrap()
1527                        .with_timezone(&Utc),
1528                )
1529            }
1530            fn is_empty(&self) -> bool {
1531                false
1532            }
1533        }
1534
1535        let corpus = TempDir::new()?;
1536        let project_dir = corpus.path().join("-tmp-pond-test");
1537        let parent_uuid = "66666666-6666-6666-6666-666666666666";
1538        let unknown_dir = project_dir
1539            .join(parent_uuid)
1540            .join("subagents")
1541            .join("workflows")
1542            .join("wf_future02-bbb");
1543        std::fs::create_dir_all(&unknown_dir)?;
1544
1545        let parent_row = serde_json::json!({
1546            "type": "user",
1547            "uuid": "u-parent-fresh",
1548            "sessionId": parent_uuid,
1549            "cwd": "/tmp/pond-test",
1550            "timestamp": "2026-05-20T00:00:00.000Z",
1551            "message": {"role": "user", "content": "parent message"},
1552        });
1553        std::fs::write(
1554            project_dir.join(format!("{parent_uuid}.jsonl")),
1555            format!("{parent_row}\n"),
1556        )?;
1557
1558        // Same parent sessionId, leaf not `agent-<hash>.jsonl`: pre-fix this would
1559        // peek the parent's id and be fresh-skipped under the far-future watermark.
1560        let unknown_row = serde_json::json!({
1561            "type": "user",
1562            "uuid": "u-resync-should-stay-visible",
1563            "sessionId": parent_uuid,
1564            "cwd": "/tmp/pond-test",
1565            "timestamp": "2026-05-20T00:02:00.000Z",
1566            "message": {"role": "user", "content": "must stay visible"},
1567        });
1568        std::fs::write(
1569            unknown_dir.join("transcript-002.jsonl"),
1570            format!("{unknown_row}\n"),
1571        )?;
1572
1573        let store_dir = TempDir::new()?;
1574        let store = Store::open_local(store_dir.path()).await?;
1575        let adapter = ClaudeCodeAdapter::new(corpus.path());
1576        let summary = ingest_adapter(&store, &adapter, &ParentAlreadyFresh, |_| {}).await?;
1577
1578        assert_eq!(
1579            summary.skipped_files, 1,
1580            "the unrecognized transcript must stay a visible Unsupported skip, not be fresh-skipped under the parent's watermark",
1581        );
1582        // The parent file legitimately fresh-skips under the far-future watermark;
1583        // the unrecognized file must NOT join it (pre-fix `skipped_fresh` would be 2).
1584        assert_eq!(
1585            summary.skipped_fresh, 1,
1586            "only the parent may fresh-skip; the unrecognized file must not borrow its watermark",
1587        );
1588        Ok(())
1589    }
1590
1591    /// Three rows with the same `uuid` (the claude-code `/resume` replay
1592    /// pattern). The adapter must dedupe at the file-state level so the
1593    /// validator never sees the duplicates; `dropped_events` stays 0 and
1594    /// `inserted` covers the single canonical row.
1595    #[tokio::test(flavor = "multi_thread")]
1596    async fn replay_duplicates_are_dedup_at_adapter_layer() -> anyhow::Result<()> {
1597        let corpus = TempDir::new()?;
1598        let project_dir = corpus.path().join("-tmp-pond-test");
1599        std::fs::create_dir_all(&project_dir)?;
1600        let session_uuid = "33333333-3333-3333-3333-333333333333";
1601        let dup_uuid = "u-shared-1";
1602        let row = serde_json::json!({
1603            "type": "user",
1604            "uuid": dup_uuid,
1605            "sessionId": session_uuid,
1606            "cwd": "/tmp/pond-test",
1607            "timestamp": "2026-05-16T00:00:00.000Z",
1608            "message": {"role": "user", "content": "replayed three times"},
1609        });
1610        // Three identical rows back-to-back, same uuid.
1611        let body = format!("{row}\n{row}\n{row}\n");
1612        std::fs::write(project_dir.join(format!("{session_uuid}.jsonl")), body)?;
1613
1614        let store_dir = TempDir::new()?;
1615        let store = Store::open_local(store_dir.path()).await?;
1616        let adapter = ClaudeCodeAdapter::new(corpus.path());
1617        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1618
1619        assert_eq!(
1620            summary.dropped_events, 0,
1621            "adapter must dedupe replays before they reach the validator"
1622        );
1623        assert!(
1624            !summary
1625                .drop_reasons
1626                .contains_key(crate::sessions::DROP_REASON_DUPLICATE_MESSAGE_ID),
1627            "duplicate_message_id bucket stays empty when adapter does its job"
1628        );
1629        Ok(())
1630    }
1631
1632    /// One assistant `tool_use` followed by a user `tool_result` in the same
1633    /// file. The adapter's per-file `tool_use_id -> name` map must resolve the
1634    /// result's tool name to the call's name. Pre-fix: synthesized `"unknown"`.
1635    #[tokio::test(flavor = "multi_thread")]
1636    async fn tool_result_name_resolves_from_prior_tool_use_in_same_file() -> anyhow::Result<()> {
1637        let corpus = TempDir::new()?;
1638        let project_dir = corpus.path().join("-tmp-pond-test");
1639        std::fs::create_dir_all(&project_dir)?;
1640        let session_uuid = "44444444-4444-4444-4444-444444444444";
1641        let call_id = "toolu_test_01";
1642
1643        let tool_use_row = serde_json::json!({
1644            "type": "assistant",
1645            "uuid": "u-call",
1646            "sessionId": session_uuid,
1647            "cwd": "/tmp/pond-test",
1648            "timestamp": "2026-05-16T00:00:00.000Z",
1649            "message": {
1650                "role": "assistant",
1651                "content": [{
1652                    "type": "tool_use",
1653                    "id": call_id,
1654                    "name": "Edit",
1655                    "input": {"file_path": "/tmp/foo"},
1656                }],
1657            },
1658        });
1659        let tool_result_row = serde_json::json!({
1660            "type": "user",
1661            "uuid": "u-result",
1662            "sessionId": session_uuid,
1663            "cwd": "/tmp/pond-test",
1664            "timestamp": "2026-05-16T00:00:01.000Z",
1665            "message": {
1666                "role": "user",
1667                "content": [{
1668                    "type": "tool_result",
1669                    "tool_use_id": call_id,
1670                    "content": "ok",
1671                }],
1672            },
1673        });
1674        std::fs::write(
1675            project_dir.join(format!("{session_uuid}.jsonl")),
1676            format!("{tool_use_row}\n{tool_result_row}\n"),
1677        )?;
1678
1679        let store_dir = TempDir::new()?;
1680        let store = Store::open_local(store_dir.path()).await?;
1681        let adapter = ClaudeCodeAdapter::new(corpus.path());
1682        let _summary =
1683            ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1684        let session = store
1685            .get_session(session_uuid)
1686            .await?
1687            .expect("session ingests");
1688
1689        let mut saw_call = false;
1690        let mut saw_result = false;
1691        for stored in &session.messages {
1692            for part in &stored.parts {
1693                match &part.kind {
1694                    PartKind::ToolCall {
1695                        call_id: cid, name, ..
1696                    } => {
1697                        assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1698                        assert_eq!(
1699                            name.as_ref().map(|e| e.as_str()),
1700                            Some("Edit"),
1701                            "tool_use carries the name directly"
1702                        );
1703                        saw_call = true;
1704                    }
1705                    PartKind::ToolResult {
1706                        call_id: cid, name, ..
1707                    } => {
1708                        assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1709                        assert_eq!(
1710                            name.as_ref().map(|e| e.as_str()),
1711                            Some("Edit"),
1712                            "tool_result resolves the name via the per-file map (was 'unknown' pre-2026-05-16)"
1713                        );
1714                        saw_result = true;
1715                    }
1716                    _ => {}
1717                }
1718            }
1719        }
1720        assert!(saw_call && saw_result, "both parts must be present");
1721        Ok(())
1722    }
1723
1724    /// spec.md#model-part-provenance: a genuine human prompt classifies
1725    /// `conversational`; a harness `<task-notification>` user-slot turn and an
1726    /// `isMeta` row classify `injected`.
1727    #[test]
1728    fn user_text_provenance_separates_prompts_from_harness_injection() {
1729        let prompt = json!({"type": "user", "uuid": "u1"});
1730        assert_eq!(
1731            user_text_provenance(&prompt, "please refactor the parser"),
1732            Provenance::Conversational,
1733        );
1734
1735        let notification = json!({"type": "user", "uuid": "u2"});
1736        assert_eq!(
1737            user_text_provenance(
1738                &notification,
1739                "<task-notification>background task done</task-notification>",
1740            ),
1741            Provenance::Injected,
1742        );
1743
1744        let meta = json!({"type": "user", "uuid": "u3", "isMeta": true});
1745        assert_eq!(
1746            user_text_provenance(&meta, "expanded skill body"),
1747            Provenance::Injected,
1748        );
1749    }
1750
1751    /// Ingest a session carrying a `<task-notification>` user message and a
1752    /// genuine prompt; the notification's part must be `injected` and the
1753    /// prompt's `conversational` (spec.md#model-part-provenance).
1754    #[tokio::test(flavor = "multi_thread")]
1755    async fn task_notification_message_yields_injected_parts() -> anyhow::Result<()> {
1756        let corpus = TempDir::new()?;
1757        let project_dir = corpus.path().join("-tmp-pond-test");
1758        std::fs::create_dir_all(&project_dir)?;
1759        let session_uuid = "66666666-6666-6666-6666-666666666666";
1760        let prompt = serde_json::json!({
1761            "type": "user",
1762            "uuid": "u-prompt",
1763            "sessionId": session_uuid,
1764            "cwd": "/tmp/pond-test",
1765            "timestamp": "2026-05-16T00:00:00.000Z",
1766            "message": {"role": "user", "content": "genuine human prompt"},
1767        });
1768        let notification = serde_json::json!({
1769            "type": "user",
1770            "uuid": "u-notify",
1771            "sessionId": session_uuid,
1772            "cwd": "/tmp/pond-test",
1773            "timestamp": "2026-05-16T00:00:01.000Z",
1774            "message": {
1775                "role": "user",
1776                "content": "<task-notification>a background task finished</task-notification>",
1777            },
1778        });
1779        std::fs::write(
1780            project_dir.join(format!("{session_uuid}.jsonl")),
1781            format!("{prompt}\n{notification}\n"),
1782        )?;
1783
1784        let store_dir = TempDir::new()?;
1785        let store = Store::open_local(store_dir.path()).await?;
1786        let adapter = ClaudeCodeAdapter::new(corpus.path());
1787        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1788
1789        let session = store
1790            .get_session(session_uuid)
1791            .await?
1792            .expect("session ingests");
1793        let mut saw_prompt = false;
1794        let mut saw_notification = false;
1795        for stored in &session.messages {
1796            for part in &stored.parts {
1797                if stored.message.id() == "u-prompt" {
1798                    assert_eq!(part.provenance, crate::wire::Provenance::Conversational);
1799                    saw_prompt = true;
1800                }
1801                if stored.message.id() == "u-notify" {
1802                    assert_eq!(part.provenance, crate::wire::Provenance::Injected);
1803                    saw_notification = true;
1804                }
1805            }
1806        }
1807        assert!(saw_prompt && saw_notification, "both messages present");
1808        Ok(())
1809    }
1810
1811    /// Orphan tool_result with no earlier tool_use in the same file: the
1812    /// per-file map can't resolve. The adapter must emit `name: None`, NOT
1813    /// the old `"unknown"` sentinel. Invariant 15 (no synthesized values).
1814    #[tokio::test(flavor = "multi_thread")]
1815    async fn orphan_tool_result_yields_name_none_not_unknown_sentinel() -> anyhow::Result<()> {
1816        let corpus = TempDir::new()?;
1817        let project_dir = corpus.path().join("-tmp-pond-test");
1818        std::fs::create_dir_all(&project_dir)?;
1819        let session_uuid = "55555555-5555-5555-5555-555555555555";
1820
1821        // tool_result with no earlier tool_use (simulates a compaction-pruned call).
1822        let row = serde_json::json!({
1823            "type": "user",
1824            "uuid": "u-orphan",
1825            "sessionId": session_uuid,
1826            "cwd": "/tmp/pond-test",
1827            "timestamp": "2026-05-16T00:00:00.000Z",
1828            "message": {
1829                "role": "user",
1830                "content": [{
1831                    "type": "tool_result",
1832                    "tool_use_id": "toolu_orphan",
1833                    "content": "result body, no matching call",
1834                }],
1835            },
1836        });
1837        std::fs::write(
1838            project_dir.join(format!("{session_uuid}.jsonl")),
1839            format!("{row}\n"),
1840        )?;
1841
1842        let store_dir = TempDir::new()?;
1843        let store = Store::open_local(store_dir.path()).await?;
1844        let adapter = ClaudeCodeAdapter::new(corpus.path());
1845        let _summary =
1846            ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1847        let session = store
1848            .get_session(session_uuid)
1849            .await?
1850            .expect("session ingests");
1851        let mut found = false;
1852        for stored in &session.messages {
1853            for part in &stored.parts {
1854                if let PartKind::ToolResult { name, call_id, .. } = &part.kind {
1855                    assert_eq!(call_id.as_ref().map(|e| e.as_str()), Some("toolu_orphan"));
1856                    assert!(
1857                        name.is_none(),
1858                        "orphan tool_result must be name=None, not synthesized 'unknown'",
1859                    );
1860                    found = true;
1861                }
1862            }
1863        }
1864        assert!(found, "orphan tool_result part must be present");
1865        // Sanity: even an orphan should not be reported as a drop.
1866        Ok(())
1867    }
1868
1869    /// The Workflow runner's `journal.jsonl` under `subagents/workflows/<wf>/`
1870    /// is a known control file, not an unrecognized transcript - it must NOT be
1871    /// flagged unsupported (it falls through to a benign Empty skip).
1872    #[test]
1873    fn workflow_journal_is_a_control_file_not_unsupported() {
1874        let adapter = ClaudeCodeAdapter::new("/tmp/pond-test-root");
1875        let journal = std::path::Path::new(
1876            "/root/-proj/55555555-5555-5555-5555-555555555555/subagents/workflows/wf_030e6487-da6/journal.jsonl",
1877        );
1878        assert!(is_workflow_control_file(journal));
1879        assert!(
1880            adapter.unsupported_reason(journal).is_none(),
1881            "journal.jsonl is a known control file, not an unsupported layout",
1882        );
1883    }
1884
1885    /// Regression guard against narrowing the net too far: a genuinely unknown
1886    /// leaf under `subagents/` is still flagged unsupported, while a recognized
1887    /// `agent-<hash>.jsonl` is not.
1888    #[test]
1889    fn unknown_subagents_leaf_is_still_unsupported() {
1890        let adapter = ClaudeCodeAdapter::new("/tmp/pond-test-root");
1891        let unknown = std::path::Path::new(
1892            "/root/-proj/PARENT/subagents/workflows/wf_x/transcript-001.jsonl",
1893        );
1894        assert!(
1895            adapter.unsupported_reason(unknown).is_some(),
1896            "an unrecognized non-agent, non-journal leaf must still fail visibly",
1897        );
1898        assert!(!is_workflow_control_file(unknown));
1899
1900        let agent = std::path::Path::new("/root/-proj/PARENT/subagents/agent-abc123def456.jsonl");
1901        assert!(
1902            adapter.unsupported_reason(agent).is_none(),
1903            "a recognized agent transcript is resolvable, not unsupported",
1904        );
1905    }
1906
1907    /// End-to-end: a workflow dir holding both a real `agent-<hash>.jsonl`
1908    /// transcript and the runner's `journal.jsonl`. The agent transcript
1909    /// ingests as a child session; the journal is a benign skip (no
1910    /// `skipped_files` failure) and its rows never merge into the parent.
1911    #[tokio::test(flavor = "multi_thread")]
1912    async fn workflow_journal_skipped_benignly_while_sibling_agent_ingests() -> anyhow::Result<()> {
1913        let corpus = TempDir::new()?;
1914        let project_dir = corpus.path().join("-tmp-pond-test");
1915        let parent_uuid = "77777777-7777-7777-7777-777777777777";
1916        let wf_id = "wf_030e6487-da6";
1917        let agent_hash = "a38f4724ef3864da8";
1918        let wf_dir = project_dir
1919            .join(parent_uuid)
1920            .join("subagents")
1921            .join("workflows")
1922            .join(wf_id);
1923        std::fs::create_dir_all(&wf_dir)?;
1924
1925        let parent_row = serde_json::json!({
1926            "type": "user",
1927            "uuid": "u-parent-1",
1928            "sessionId": parent_uuid,
1929            "cwd": "/tmp/pond-test",
1930            "timestamp": "2026-06-04T00:00:00.000Z",
1931            "message": {"role": "user", "content": "hi parent"},
1932        });
1933        std::fs::write(
1934            project_dir.join(format!("{parent_uuid}.jsonl")),
1935            format!("{parent_row}\n"),
1936        )?;
1937
1938        let agent_row = serde_json::json!({
1939            "type": "user",
1940            "uuid": "u-agent-1",
1941            "sessionId": parent_uuid,
1942            "cwd": "/tmp/pond-test",
1943            "timestamp": "2026-06-04T00:01:00.000Z",
1944            "message": {"role": "user", "content": "workflow agent prompt"},
1945        });
1946        std::fs::write(
1947            wf_dir.join(format!("agent-{agent_hash}.jsonl")),
1948            format!("{agent_row}\n"),
1949        )?;
1950
1951        // The Workflow journal: control events only, no sessionId.
1952        std::fs::write(
1953            wf_dir.join("journal.jsonl"),
1954            "{\"type\":\"started\",\"key\":\"v2:abc\",\"agentId\":\"a38f\"}\n\
1955             {\"type\":\"result\",\"key\":\"v2:abc\",\"agentId\":\"a38f\",\"result\":{}}\n",
1956        )?;
1957
1958        let store_dir = TempDir::new()?;
1959        let store = Store::open_local(store_dir.path()).await?;
1960        let adapter = ClaudeCodeAdapter::new(corpus.path());
1961        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1962        assert_eq!(
1963            summary.skipped_files, 0,
1964            "journal.jsonl is a control file (benign Empty skip), not an unsupported failure",
1965        );
1966
1967        let child = store
1968            .get_session(&format!(
1969                "{parent_uuid}/workflows/{wf_id}/agent-{agent_hash}"
1970            ))
1971            .await?
1972            .expect("the sibling agent transcript still ingests as a child session");
1973        assert_eq!(
1974            child.session.parent_session_id.as_deref(),
1975            Some(parent_uuid)
1976        );
1977
1978        let parent = store
1979            .get_session(parent_uuid)
1980            .await?
1981            .expect("parent session ingests");
1982        assert_eq!(
1983            parent.messages.len(),
1984            1,
1985            "journal rows must NOT merge into the parent session",
1986        );
1987        Ok(())
1988    }
1989
1990    /// Hardening: even a journal.jsonl whose rows DO carry the parent
1991    /// `sessionId` must not merge - the guard is structural, not contingent on
1992    /// the journal lacking one.
1993    #[tokio::test(flavor = "multi_thread")]
1994    async fn workflow_journal_with_parent_sessionid_still_not_merged() -> anyhow::Result<()> {
1995        let corpus = TempDir::new()?;
1996        let project_dir = corpus.path().join("-tmp-pond-test");
1997        let parent_uuid = "88888888-8888-8888-8888-888888888888";
1998        let wf_dir = project_dir
1999            .join(parent_uuid)
2000            .join("subagents")
2001            .join("workflows")
2002            .join("wf_abc01234-def");
2003        std::fs::create_dir_all(&wf_dir)?;
2004
2005        let parent_row = serde_json::json!({
2006            "type": "user",
2007            "uuid": "u-parent",
2008            "sessionId": parent_uuid,
2009            "cwd": "/tmp/pond-test",
2010            "timestamp": "2026-06-04T00:00:00.000Z",
2011            "message": {"role": "user", "content": "parent only"},
2012        });
2013        std::fs::write(
2014            project_dir.join(format!("{parent_uuid}.jsonl")),
2015            format!("{parent_row}\n"),
2016        )?;
2017
2018        // A journal carrying the PARENT sessionId (hypothetical future shape):
2019        // the structural guard must still refuse to merge it.
2020        let journal_row = serde_json::json!({
2021            "type": "started",
2022            "key": "v2:abc",
2023            "agentId": "a1",
2024            "sessionId": parent_uuid,
2025            "message": {"role": "user", "content": "must not merge"},
2026        });
2027        std::fs::write(wf_dir.join("journal.jsonl"), format!("{journal_row}\n"))?;
2028
2029        let store_dir = TempDir::new()?;
2030        let store = Store::open_local(store_dir.path()).await?;
2031        let adapter = ClaudeCodeAdapter::new(corpus.path());
2032        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
2033        assert_eq!(
2034            summary.skipped_files, 0,
2035            "journal is a benign Empty skip, not an unsupported failure",
2036        );
2037        let parent = store
2038            .get_session(parent_uuid)
2039            .await?
2040            .expect("parent session ingests");
2041        assert_eq!(
2042            parent.messages.len(),
2043            1,
2044            "journal row must NOT merge even when it carries the parent sessionId",
2045        );
2046        Ok(())
2047    }
2048}