Skip to main content

pond/adapter/
claude_code.rs

1//! Claude Code CLI adapter.
2//!
3//! Source path: `~/.claude/projects/<encoded-project-path>/<session-uuid>.jsonl`.
4//! Each `.jsonl` file is one session; lines are typed entries linked via a
5//! `parentUuid` -> `uuid` chain. Tool results arrive as `user` entries whose
6//! `message.content[]` contains `tool_result` blocks with a parallel
7//! `toolUseResult` field carrying structured data.
8
9use std::{
10    collections::{HashMap, HashSet},
11    path::{Path, PathBuf},
12};
13
14use anyhow::Context as _;
15use chrono::{DateTime, SecondsFormat, Utc};
16use serde_json::{Value, json};
17
18use crate::{
19    sessions::IngestEvent,
20    wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
21};
22
23use super::{
24    Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
25    RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
26    empty_options,
27    extract::{
28        Extracted, Source, extract_compact_repr, extract_raw_record, extract_self_str, extract_str,
29    },
30    extracted_text,
31    jsonl::{BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events, source_line},
32    jsonl_bytes, part_id, part_ordinal, raw_record,
33};
34
35/// Per-file streaming state that persists across rows of one JSONL file.
36/// Lives inside [`Adapter::events`]'s per-file loop and is reset whenever
37/// the loop advances to the next file.
38///
39/// Two responsibilities:
40///
41/// 1. **Replay dedup.** Claude Code's `/resume` and `/compact` paths
42///    occasionally re-emit byte-identical rows with the same `uuid` (the
43///    stale-`messageSet`-cache bug in claude-code, see
44///    `utils/sessionStorage.ts`). The adapter dedupes here so the validator
45///    never sees the same PK twice; validator HashSet stays a safety net.
46///
47/// 2. **`tool_use_id -> tool name` resolution.** The raw `tool_result` row
48///    carries only `tool_use_id`, not the tool name; the name lives on the
49///    prior `tool_use` row in the same file. We populate this map when we
50///    see a `tool_use` part, then look it up when we see the matching
51///    `tool_result` part. Misses (e.g. compaction pruned the tool_use)
52///    surface as `name: None` in `PartKind::ToolResult` rather than the
53///    old `"unknown"` sentinel - faithful to the source rather than
54///    inventing a value.
55#[derive(Debug, Default)]
56pub(crate) struct FileState {
57    seen_uuids: HashSet<String>,
58    tool_call_names: HashMap<String, Extracted<String>>,
59}
60
61/// Stable adapter name. Surfaces as the `[sources.claude-code]` config key,
62/// the `pond sync claude-code` CLI arg, and `Session.source_agent` on every
63/// emitted row.
64const NAME: &str = "claude-code";
65
66/// Stateless factory: opens [`ClaudeCodeAdapter`] instances from config and
67/// probes for the canonical install location under `~/.claude/projects`.
68pub struct ClaudeCodeFactory;
69
70impl AdapterFactory for ClaudeCodeFactory {
71    fn name(&self) -> &'static str {
72        NAME
73    }
74
75    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
76        Ok(Box::new(ClaudeCodeAdapter::new(config_path(NAME, config)?)))
77    }
78
79    fn probe_default(&self, env: &Env) -> Option<Value> {
80        let path = env.home.join(".claude").join("projects");
81        path.exists().then(|| json!({ "path": path }))
82    }
83
84    fn serialize(
85        &self,
86        session: &crate::sessions::SessionWithMessages,
87        fidelity: RestoreFidelity,
88    ) -> Result<Vec<RestoredFile>, AdapterError> {
89        serialize_session(session, fidelity)
90    }
91}
92
93fn serialize_session(
94    session: &crate::sessions::SessionWithMessages,
95    fidelity: RestoreFidelity,
96) -> Result<Vec<RestoredFile>, AdapterError> {
97    let mut messages = session.messages.clone();
98    if fidelity == RestoreFidelity::Native {
99        messages.sort_by(|left, right| {
100            source_line(left.message.options())
101                .cmp(&source_line(right.message.options()))
102                .then_with(|| by_timestamp_then_id(left, right))
103        });
104    } else {
105        messages.sort_by(by_timestamp_then_id);
106    }
107    // Native replays the verbatim `options.source.raw_record`; `claude_record`
108    // below is foreign-only. Replay echoes a frozen snapshot - safe only while
109    // canonical is append-only (spec.md#adapter-integrity-additive-sync).
110    let mut records = Vec::with_capacity(messages.len());
111    let mut parent_uuid = None::<String>;
112    for message in &messages {
113        if fidelity == RestoreFidelity::Native
114            && let Some(raw) = raw_record(message.message.options())
115        {
116            parent_uuid = raw
117                .get("uuid")
118                .and_then(Value::as_str)
119                .map(ToOwned::to_owned)
120                .or(parent_uuid);
121            records.push(raw);
122            continue;
123        }
124        // `claude_record` returns `None` for a dropped System message;
125        // `parent_uuid` then stays put so the chain skips over the gap.
126        let Some(record) = claude_record(session, message, parent_uuid.as_deref()) else {
127            continue;
128        };
129        parent_uuid = record
130            .get("uuid")
131            .and_then(Value::as_str)
132            .map(ToOwned::to_owned);
133        records.push(record);
134    }
135
136    let mut files = vec![RestoredFile::new(
137        claude_relative_path(session),
138        jsonl_bytes(NAME, &records)?,
139        fidelity,
140    )];
141    if session.session.parent_session_id.is_some()
142        && let Some(meta) = subagent_meta_record(session)
143    {
144        let mut meta_path = files[0].relative_path.clone();
145        meta_path.set_extension("meta.json");
146        files.push(RestoredFile::new(
147            meta_path,
148            serde_json::to_vec(&meta).map_err(|err| {
149                AdapterError::schema(
150                    NAME,
151                    &session.session.id,
152                    format!("json encode failed: {err}"),
153                )
154            })?,
155            fidelity,
156        ));
157    }
158    Ok(files)
159}
160
161fn claude_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
162    let encoded_project = session
163        .session
164        .options
165        .get("source")
166        .and_then(|source| source.get("project_dir"))
167        .and_then(Value::as_str)
168        .map(ToOwned::to_owned)
169        .unwrap_or_else(|| encode_project(&session.session.project));
170    if let Some(parent) = &session.session.parent_session_id {
171        // The child id is `<parent>/<child_suffix>`; the suffix is the file's
172        // path under `subagents/` (`agent-<hash>` flat, or
173        // `workflows/<wf-id>/agent-<hash>` nested), so stripping the parent
174        // prefix reconstructs the on-disk path verbatim.
175        let child_suffix = session
176            .session
177            .id
178            .strip_prefix(&format!("{parent}/"))
179            .unwrap_or(&session.session.id);
180        return PathBuf::from(encoded_project)
181            .join(parent)
182            .join("subagents")
183            .join(format!("{child_suffix}.jsonl"));
184    }
185    PathBuf::from(encoded_project).join(format!("{}.jsonl", session.session.id))
186}
187
188fn encode_project(project: &str) -> String {
189    project.replace(['/', '.'], "-")
190}
191
192fn subagent_meta_record(session: &crate::sessions::SessionWithMessages) -> Option<Value> {
193    // Restore the sidecar `.meta.json` verbatim from the stored copy. A
194    // subagent ingested without a meta file stored `meta: null` - nothing
195    // to write back.
196    let meta = session.session.options.get("subagent")?.get("meta")?;
197    meta.is_object().then(|| meta.clone())
198}
199
200fn claude_record(
201    session: &crate::sessions::SessionWithMessages,
202    message: &crate::sessions::MessageWithParts,
203    parent_uuid: Option<&str>,
204) -> Option<Value> {
205    // Foreign restore into Claude Code (native restore re-emits the stored
206    // `raw_record` and never reaches here). Claude Code's transcript has only
207    // `user` and `assistant` rows: a tool result is a `user` row, and there
208    // is no in-transcript system turn - a System message (a rule-3 carrier or
209    // a source's own system/developer turn) has no idiomatic home and is
210    // dropped; the content stays in canonical (spec.md#adapter-native-restore-lossless,
211    // foreign clause).
212    let row_role = match &message.message {
213        Message::System { .. } => return None,
214        Message::User { .. } | Message::Tool { .. } => "user",
215        Message::Assistant { .. } => "assistant",
216    };
217    let mut envelope = serde_json::Map::new();
218    envelope.insert("role".to_owned(), Value::String(row_role.to_owned()));
219    if row_role == "assistant" {
220        // `type:"message"` is the Anthropic Messages API object discriminator
221        // - a constant, always present on a real assistant row.
222        envelope.insert("type".to_owned(), Value::String("message".to_owned()));
223    }
224    envelope.insert(
225        "content".to_owned(),
226        Value::Array(message.parts.iter().map(claude_part).collect()),
227    );
228    Some(json!({
229        "parentUuid": parent_uuid,
230        "isSidechain": false,
231        "userType": "external",
232        "cwd": &*session.session.project,
233        "sessionId": &session.session.id,
234        "type": row_role,
235        "message": Value::Object(envelope),
236        "uuid": message.message.id(),
237        "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
238    }))
239}
240
241fn claude_part(part: &Part) -> Value {
242    match &part.kind {
243        PartKind::Text { text } => json!({"type": "text", "text": extracted_text(text)}),
244        PartKind::Reasoning { text } => {
245            json!({"type": "thinking", "thinking": extracted_text(text)})
246        }
247        PartKind::ToolCall {
248            call_id,
249            name,
250            params,
251            provider_executed,
252        } => json!({
253            "type": if *provider_executed { "server_tool_use" } else { "tool_use" },
254            "id": extracted_text(call_id),
255            "name": extracted_text(name),
256            "input": params,
257        }),
258        PartKind::ToolResult {
259            call_id,
260            is_failure,
261            result,
262            ..
263        } => json!({
264            "type": "tool_result",
265            "tool_use_id": extracted_text(call_id),
266            "is_error": is_failure,
267            "content": result,
268        }),
269        PartKind::File {
270            media_type,
271            file_name,
272            data,
273        } => json!({
274            "type": "file",
275            "media_type": media_type,
276            "file_name": file_name,
277            "source": file_source(data),
278        }),
279        other => {
280            json!({"type": "text", "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null))})
281        }
282    }
283}
284
285fn file_source(data: &FileData) -> Value {
286    match data {
287        FileData::String(value) => json!({"type": "text", "data": value}),
288        FileData::Bytes(value) => json!({"type": "base64", "data": value}),
289        FileData::Url(value) => json!({"type": "url", "url": value}),
290    }
291}
292
293/// Configured claude-code reader. Walks a tree of `*.jsonl` files under
294/// [`Self::root`] and yields canonical events in source order per session.
295#[derive(Debug, Clone)]
296pub struct ClaudeCodeAdapter {
297    root: PathBuf,
298}
299
300impl ClaudeCodeAdapter {
301    pub fn new(root: impl Into<PathBuf>) -> Self {
302        Self { root: root.into() }
303    }
304}
305
306impl Adapter for ClaudeCodeAdapter {
307    fn discover(&self) -> DiscoverFuture<'_> {
308        jsonl_tree_discover(self)
309    }
310
311    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
312        jsonl_tree_events(self, oracle)
313    }
314}
315
316impl JsonlTree for ClaudeCodeAdapter {
317    type State = FileState;
318
319    fn name(&self) -> &'static str {
320        NAME
321    }
322
323    fn root(&self) -> &Path {
324        &self.root
325    }
326
327    fn peek_session_id(&self, path: &Path, first_line: &str) -> Option<String> {
328        // A file under `subagents/` takes its id from the path, never from the
329        // row's content `sessionId` (that's the parent's). A recognized child
330        // peeks to its child id; an unrecognized one returns `None` so it stays
331        // out of the freshness gate and its `unsupported_reason` failure
332        // re-surfaces on every sync rather than being skipped as `Fresh` under
333        // the parent's borrowed watermark. See spec.md#datasets.
334        if subagents_dir(path).is_some() {
335            let (parent_uuid, child_suffix, _) = subagent_ids(path)?;
336            return Some(format!("{parent_uuid}/{child_suffix}"));
337        }
338        let row: Value = serde_json::from_str(first_line).ok()?;
339        row.get("sessionId")?.as_str().map(ToOwned::to_owned)
340    }
341
342    fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
343        session_from_rows(path, rows)
344    }
345
346    fn events_from_row(
347        &self,
348        session: &Session,
349        row: &BoundedRow,
350        state: &mut Self::State,
351    ) -> Result<Vec<IngestEvent>, String> {
352        if let Some(uuid) = row.value.get("uuid").and_then(Value::as_str)
353            && !state.seen_uuids.insert(uuid.to_owned())
354        {
355            return Ok(Vec::new());
356        }
357        capture_tool_call_names(&row.value, &mut state.tool_call_names);
358        events_from_row(&session.id, row.line, &row.value, session.created_at, state)
359    }
360
361    fn unsupported_reason(&self, path: &Path) -> Option<String> {
362        // A `.jsonl` under a `subagents/` ancestor that we can't resolve to a
363        // child id (its leaf isn't `agent-<hash>.jsonl`) must NOT fall back to
364        // its content `sessionId` - that id is the parent's, so it would
365        // silently merge into the parent session. Fail visibly and wait for an
366        // adapter update instead - EXCEPT the Workflow runner's `journal.jsonl`,
367        // a known control file that carries no `sessionId`, so it falls through
368        // to `session()` and is dropped as a benign Empty skip rather than
369        // flagged as an unrecognized transcript layout. See spec.md#datasets.
370        if subagents_dir(path).is_some()
371            && subagent_ids(path).is_none()
372            && !is_workflow_control_file(path)
373        {
374            return Some(format!(
375                "{}: subagent transcript layout not recognized by this pond version; \
376                 skipped so it is not merged into the parent session - update pond and \
377                 re-run `pond sync`",
378                path.display()
379            ));
380        }
381        None
382    }
383}
384
385/// The Workflow runner writes `journal.jsonl` (its resume/cache journal of agent
386/// `started`/`result` events) beside the `agent-<hash>.jsonl` transcripts under
387/// `subagents/workflows/<wf-id>/`. It carries no `sessionId` and only duplicates
388/// content already in those transcripts, so it is a control file to ignore (a
389/// benign Empty skip), not an unrecognized transcript layout. See spec.md#datasets.
390fn is_workflow_control_file(path: &Path) -> bool {
391    subagents_dir(path).is_some()
392        && path.file_name().and_then(|n| n.to_str()) == Some("journal.jsonl")
393}
394
395/// Walk one raw row's `message.content[]` array (if any) and stash every
396/// `tool_use` part's `id -> name` mapping into the per-file map. Idempotent
397/// and safe to call on every row regardless of role; non-assistant rows
398/// just don't contribute entries.
399fn capture_tool_call_names(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
400    let Some(items) = row
401        .get("message")
402        .and_then(|message| message.get("content"))
403        .and_then(Value::as_array)
404    else {
405        return;
406    };
407    for item in items {
408        let kind = item.get("type").and_then(Value::as_str);
409        if !matches!(kind, Some("tool_use") | Some("server_tool_use")) {
410            continue;
411        }
412        let (Some(id), Some(name)) = (item.str_field("id"), extract_str(item, "name")) else {
413            continue;
414        };
415        map.insert(id.to_owned(), name);
416    }
417}
418
419fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
420    let path_display = path.display().to_string();
421    // A non-agent leaf under `subagents/` (e.g. the Workflow runner's
422    // journal.jsonl) would borrow the parent's content `sessionId` and silently
423    // merge; refuse structurally rather than rely on the row lacking one.
424    // spec.md#datasets.
425    if subagents_dir(path).is_some() && subagent_ids(path).is_none() {
426        return Err(AdapterError::schema(
427            NAME,
428            path_display,
429            "sidecar/control file under subagents/ has no session of its own",
430        ));
431    }
432    let mut created_at = None;
433    let mut project: Option<Extracted<String>> = None;
434    let mut version = None;
435    for row in rows {
436        if created_at.is_none() {
437            created_at = parse_timestamp(&row.value).ok();
438        }
439        if project.is_none() {
440            project = extract_str(&row.value, "cwd");
441        }
442        if version.is_none() {
443            version = row
444                .value
445                .get("version")
446                .and_then(Value::as_str)
447                .map(ToOwned::to_owned);
448        }
449    }
450
451    let first = rows
452        .first()
453        .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
454    let at_first = format!("{path_display}:{}", first.line);
455    let raw_session_id = first
456        .value
457        .get("sessionId")
458        .and_then(Value::as_str)
459        .ok_or_else(|| {
460            AdapterError::schema(
461                NAME,
462                at_first.clone(),
463                format!("line {} missing sessionId", first.line),
464            )
465        })?
466        .to_owned();
467    let created_at = created_at.ok_or_else(|| {
468        AdapterError::schema(NAME, at_first, "session has no parseable timestamp")
469    })?;
470
471    // Subagent detection. Claude Code stores each subagent's transcript under
472    // the session's `subagents/` sidecar - either flat
473    // (`<parent_dir>/<parent_uuid>/subagents/agent-<hash>.jsonl`) or, for the
474    // workflow runner, nested
475    // (`.../subagents/workflows/<wf-id>/agent-<hash>.jsonl`) - with a sibling
476    // `agent-<hash>.meta.json` carrying `{agentType, description}`. Every such
477    // file shares the parent's `sessionId` in row content, so ingesting it under
478    // that id collides with the parent (the validator's "project is immutable"
479    // rule rejects a cwd-shifted one, and a same-cwd one silently merges). The
480    // fix is to derive a child id from the path - keyed off the `subagents/`
481    // ancestor at any depth - and link back via `parent_session_id`. See
482    // spec.md#datasets.
483    let subagent = subagent_descriptor(path);
484    let project_dir = source_project_dir(path, subagent.is_some());
485    let (session_id, parent_session_id, source_agent, subagent_options) = match subagent {
486        Some(SubagentDescriptor {
487            parent_uuid,
488            child_suffix,
489            agent_hash,
490            agent_type,
491            meta,
492        }) => {
493            let child_id = format!("{parent_uuid}/{child_suffix}");
494            let agent_label = agent_type
495                .as_deref()
496                .map(|t| format!("claude-code/{t}"))
497                .unwrap_or_else(|| "claude-code/subagent".to_owned());
498            // `meta` is the verbatim `.meta.json`; `hash` and `raw_session_id`
499            // are pond-derived (filename hash + parent sessionId). Storing the
500            // whole meta keeps native restore of the sidecar lossless.
501            let metadata = json!({
502                "hash": agent_hash,
503                "raw_session_id": raw_session_id,
504                "meta": meta,
505            });
506            (child_id, Some(parent_uuid), agent_label, Some(metadata))
507        }
508        None => (raw_session_id, None, "claude-code".to_owned(), None),
509    };
510
511    let project = match project {
512        Some(value) => value,
513        None => {
514            let decoded = path
515                .parent()
516                .and_then(|p| p.file_name())
517                .and_then(|n| n.to_str())
518                .map(|s| s.replace('-', "/"))
519                .ok_or_else(|| {
520                    AdapterError::schema(
521                        NAME,
522                        path_display.clone(),
523                        "no `cwd` field in any row and source path is not UTF-8",
524                    )
525                })?;
526            extract_self_str(&Value::String(decoded)).ok_or_else(|| {
527                AdapterError::schema(
528                    NAME,
529                    path_display.clone(),
530                    "internal: Value::String produced None from Source::as_str",
531                )
532            })?
533        }
534    };
535
536    let mut options = ProviderOptions::new();
537    options.insert(
538        "source".to_owned(),
539        json!({
540            "adapter": "claude-code",
541            "version": version,
542            "project_dir": project_dir,
543            "workspace_path": &*project,
544        }),
545    );
546    if let Some(metadata) = subagent_options {
547        options.insert("subagent".to_owned(), metadata);
548    }
549
550    Ok(Session {
551        id: session_id,
552        parent_session_id,
553        parent_message_id: None,
554        source_agent,
555        created_at,
556        project,
557        options,
558    })
559}
560
561fn source_project_dir(path: &Path, is_subagent: bool) -> Option<String> {
562    // The project dir is the grandparent of `subagents/` regardless of how
563    // deeply the transcript nests below it (`.../<project>/<parent_uuid>/
564    // subagents/...`), so climb from the `subagents/` ancestor rather than a
565    // fixed number of `.parent()` hops.
566    let project_dir = if is_subagent {
567        subagents_dir(path)?.parent()?.parent()
568    } else {
569        path.parent()
570    };
571    project_dir
572        .and_then(|p| p.file_name())
573        .and_then(|n| n.to_str())
574        .map(ToOwned::to_owned)
575}
576
577/// The `subagents/` directory in `path`'s ancestry, if any. Depth-independent:
578/// matches both the flat `<parent_uuid>/subagents/agent-<hash>.jsonl` and the
579/// nested workflow `<parent_uuid>/subagents/workflows/<wf-id>/agent-<hash>.jsonl`
580/// layouts. The directory directly above it is the parent session uuid.
581fn subagents_dir(path: &Path) -> Option<&Path> {
582    let mut cur = path.parent();
583    while let Some(dir) = cur {
584        if dir.file_name().and_then(|n| n.to_str()) == Some("subagents") {
585            return Some(dir);
586        }
587        cur = dir.parent();
588    }
589    None
590}
591
592/// Resolved metadata for one subagent JSONL file. `agent_type` is read from
593/// the sibling `.meta.json` for the `source_agent` label; `meta` keeps that
594/// file's full verbatim content so native restore reproduces it
595/// (spec.md#adapter-native-restore-lossless). Both are `None` when the meta file is
596/// absent or unreadable (the label falls back to `claude-code/subagent`).
597struct SubagentDescriptor {
598    parent_uuid: String,
599    child_suffix: String,
600    agent_hash: String,
601    agent_type: Option<String>,
602    meta: Option<Value>,
603}
604
605/// `(parent_uuid, child_suffix, agent_hash)` for a subagent transcript, or
606/// `None` for any path without a `subagents/` ancestor or a non-`agent-<hash>`
607/// leaf (the common case: top-level session files). `child_suffix` is the file's
608/// path relative to its `subagents/` ancestor with `.jsonl` stripped -
609/// `agent-<hash>` flat, `workflows/<wf-id>/agent-<hash>` nested - so the derived
610/// child id `<parent_uuid>/<child_suffix>` round-trips back to the on-disk path
611/// on native restore. `agent_hash` keys the sibling `.meta.json` lookup.
612fn subagent_ids(path: &Path) -> Option<(String, String, String)> {
613    let file_name = path.file_name()?.to_str()?;
614    let agent_hash = file_name
615        .strip_prefix("agent-")?
616        .strip_suffix(".jsonl")?
617        .to_owned();
618    let subagents = subagents_dir(path)?;
619    let parent_uuid = subagents.parent()?.file_name()?.to_str()?.to_owned();
620    // The child id must be `/`-canonical on every platform (the rest of the
621    // adapter and `claude_relative_path` assume `/`), but a relative path carries
622    // the OS separator - normalize it. No-op on POSIX.
623    let child_suffix = path
624        .strip_prefix(subagents)
625        .ok()?
626        .with_extension("")
627        .to_str()?
628        .replace(std::path::MAIN_SEPARATOR, "/");
629    Some((parent_uuid, child_suffix, agent_hash))
630}
631
632/// [`subagent_ids`] plus the sibling `agent-<hash>.meta.json` - `agentType` for
633/// the `source_agent` label, the whole file for lossless sidecar restore.
634fn subagent_descriptor(path: &Path) -> Option<SubagentDescriptor> {
635    let (parent_uuid, child_suffix, agent_hash) = subagent_ids(path)?;
636    let meta_path = path.parent()?.join(format!("agent-{agent_hash}.meta.json"));
637    let (agent_type, meta) = match std::fs::read(&meta_path) {
638        Ok(bytes) => match serde_json::from_slice::<Value>(&bytes) {
639            Ok(value) => (
640                value
641                    .get("agentType")
642                    .and_then(Value::as_str)
643                    .map(ToOwned::to_owned),
644                Some(value),
645            ),
646            Err(error) => {
647                tracing::debug!(
648                    target: "pond::adapter::claude_code",
649                    meta = %meta_path.display(),
650                    %error,
651                    "subagent .meta.json present but unparseable; falling back to 'claude-code/subagent'",
652                );
653                (None, None)
654            }
655        },
656        Err(error) if error.kind() == std::io::ErrorKind::NotFound => (None, None),
657        Err(error) => {
658            tracing::debug!(
659                target: "pond::adapter::claude_code",
660                meta = %meta_path.display(),
661                %error,
662                "subagent .meta.json IO error; falling back to 'claude-code/subagent'",
663            );
664            (None, None)
665        }
666    };
667
668    Some(SubagentDescriptor {
669        parent_uuid,
670        child_suffix,
671        agent_hash,
672        agent_type,
673        meta,
674    })
675}
676
677fn events_from_row(
678    session_id: &str,
679    line: usize,
680    row: &Value,
681    default_timestamp: DateTime<Utc>,
682    state: &FileState,
683) -> Result<Vec<IngestEvent>, String> {
684    let timestamp = parse_timestamp(row).unwrap_or(default_timestamp);
685    let uuid = row
686        .get("uuid")
687        .and_then(Value::as_str)
688        .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
689
690    if let Some(message_value) = row.get("message") {
691        return message_events(
692            session_id,
693            &uuid,
694            timestamp,
695            row,
696            message_value,
697            state,
698            line,
699        );
700    }
701
702    // Rows with no `message` field are session-metadata records:
703    // `queue-operation`, `permission-mode`, `last-prompt`, `attachment`,
704    // `progress`, `system`, `custom-title`, etc. We preserve them as
705    // System messages with the row's compact JSON in `content` so a future
706    // exporter could reconstruct the original transcript; the `subtype`
707    // becomes the human label via `options.source.raw_type`.
708    let raw_type = row.get("type").and_then(Value::as_str);
709    let content = if raw_type == Some("attachment") {
710        row.get("attachment")
711            .and_then(attachment_content)
712            .or_else(|| Some(extract_compact_repr(row)))
713    } else {
714        extract_str(row, "subtype").or_else(|| extract_str(row, "type"))
715    };
716    let message = Message::System {
717        id: uuid,
718        session_id: session_id.to_owned(),
719        timestamp,
720        content,
721        options: row_options(row, line),
722    };
723    Ok(vec![IngestEvent::Message(message)])
724}
725
726fn message_events(
727    session_id: &str,
728    uuid: &str,
729    timestamp: DateTime<Utc>,
730    row: &Value,
731    message_value: &Value,
732    state: &FileState,
733    line: usize,
734) -> Result<Vec<IngestEvent>, String> {
735    let role = message_value
736        .get("role")
737        .and_then(Value::as_str)
738        .ok_or_else(|| "message missing role".to_owned())?;
739    let content = message_value.get("content").unwrap_or(&Value::Null);
740    let mut parts = Vec::new();
741    let message = match (role, content) {
742        ("user", Value::String(text)) => {
743            // spec.md#model-part-provenance: a user-slot turn is conversation only
744            // when it is a genuine human prompt; harness-injected wrappers and
745            // `isMeta` rows are scaffolding.
746            let provenance = user_text_provenance(row, text);
747            parts.push(text_part(
748                session_id,
749                uuid,
750                0,
751                extract_self_str(content),
752                provenance,
753            ));
754            Message::User {
755                id: uuid.to_owned(),
756                session_id: session_id.to_owned(),
757                timestamp,
758                options: row_options(row, line),
759            }
760        }
761        ("user", Value::Array(items)) if items.iter().all(is_tool_result) => {
762            let source_tool_result = row.get("toolUseResult").cloned();
763            parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
764                tool_result_part(
765                    session_id,
766                    uuid,
767                    ordinal,
768                    item,
769                    source_tool_result.as_ref(),
770                    state,
771                )
772            }));
773            Message::Tool {
774                id: uuid.to_owned(),
775                session_id: session_id.to_owned(),
776                timestamp,
777                options: row_options(row, line),
778            }
779        }
780        ("user", Value::Array(items)) => {
781            // Classify the whole user message once: v1 claude-code never mixes
782            // provenance within a single message (spec.md#model-part-provenance).
783            let provenance = user_array_provenance(row, items);
784            parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
785                user_part(session_id, uuid, ordinal, item, state, provenance)
786            }));
787            Message::User {
788                id: uuid.to_owned(),
789                session_id: session_id.to_owned(),
790                timestamp,
791                options: row_options(row, line),
792            }
793        }
794        ("assistant", Value::Array(items)) => {
795            parts.extend(
796                items
797                    .iter()
798                    .enumerate()
799                    .map(|(ordinal, item)| assistant_part(session_id, uuid, ordinal, item)),
800            );
801            Message::Assistant {
802                id: uuid.to_owned(),
803                session_id: session_id.to_owned(),
804                timestamp,
805                options: assistant_options(row, message_value, line),
806            }
807        }
808        ("system", Value::String(_)) => Message::System {
809            id: uuid.to_owned(),
810            session_id: session_id.to_owned(),
811            timestamp,
812            content: extract_self_str(content),
813            options: row_options(row, line),
814        },
815        ("system", _) => Message::System {
816            id: uuid.to_owned(),
817            session_id: session_id.to_owned(),
818            timestamp,
819            // Fallback for system messages without a string content: serialize
820            // the structured body as JSON. This is not a synthesized value
821            // (the row genuinely had this content), just a lossless string
822            // encoding of structured data.
823            content: Some(extract_compact_repr(message_value)),
824            options: row_options(row, line),
825        },
826        (other, _) => {
827            return Err(format!("unsupported message role {other}"));
828        }
829    };
830
831    let mut events = Vec::with_capacity(parts.len() + 1);
832    events.push(IngestEvent::Message(message));
833    events.extend(parts.into_iter().map(IngestEvent::Part));
834    Ok(events)
835}
836
837fn text_part(
838    session_id: &str,
839    message_id: &str,
840    ordinal: usize,
841    text: Option<Extracted<String>>,
842    provenance: Provenance,
843) -> Part {
844    Part {
845        session_id: session_id.to_owned(),
846        id: part_id(message_id, ordinal),
847        message_id: message_id.to_owned(),
848        ordinal: part_ordinal(ordinal),
849        provenance,
850        options: empty_options(),
851        kind: PartKind::Text { text },
852    }
853}
854
855fn user_part(
856    session_id: &str,
857    message_id: &str,
858    ordinal: usize,
859    value: &Value,
860    state: &FileState,
861    provenance: Provenance,
862) -> Part {
863    match value.get("type").and_then(Value::as_str) {
864        Some("text") => text_part(
865            session_id,
866            message_id,
867            ordinal,
868            extract_str(value, "text"),
869            provenance,
870        ),
871        Some("image") | Some("file") => {
872            file_part(session_id, message_id, ordinal, value, provenance)
873        }
874        Some("tool_result") => {
875            tool_result_part(session_id, message_id, ordinal, value, None, state)
876        }
877        // Unknown user part shapes: preserve the raw JSON in the Text slot
878        // rather than dropping. This is not a synthesized value - it's a
879        // lossless encoding of structured data the schema doesn't model.
880        _ => text_part(
881            session_id,
882            message_id,
883            ordinal,
884            Some(extract_compact_repr(value)),
885            provenance,
886        ),
887    }
888}
889
890fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, value: &Value) -> Part {
891    // spec.md#model-part-provenance: assistant content - text, reasoning, tool calls -
892    // is model-authored, hence conversational. `tool_result` parts never appear
893    // on an assistant message.
894    match value.get("type").and_then(Value::as_str) {
895        Some("text") => text_part(
896            session_id,
897            message_id,
898            ordinal,
899            extract_str(value, "text"),
900            Provenance::Conversational,
901        ),
902        Some("thinking") => Part {
903            session_id: session_id.to_owned(),
904            id: part_id(message_id, ordinal),
905            message_id: message_id.to_owned(),
906            ordinal: part_ordinal(ordinal),
907            provenance: Provenance::Conversational,
908            options: signature_options(value),
909            kind: PartKind::Reasoning {
910                text: extract_str(value, "thinking"),
911            },
912        },
913        Some("tool_use") => Part {
914            session_id: session_id.to_owned(),
915            id: part_id(message_id, ordinal),
916            message_id: message_id.to_owned(),
917            ordinal: part_ordinal(ordinal),
918            provenance: Provenance::Conversational,
919            options: empty_options(),
920            kind: PartKind::ToolCall {
921                call_id: extract_str(value, "id"),
922                name: extract_str(value, "name"),
923                params: value.get("input").cloned().unwrap_or(Value::Null),
924                provider_executed: false,
925            },
926        },
927        Some("server_tool_use") => Part {
928            session_id: session_id.to_owned(),
929            id: part_id(message_id, ordinal),
930            message_id: message_id.to_owned(),
931            ordinal: part_ordinal(ordinal),
932            provenance: Provenance::Conversational,
933            options: empty_options(),
934            kind: PartKind::ToolCall {
935                call_id: extract_str(value, "id"),
936                name: extract_str(value, "name"),
937                params: value.get("input").cloned().unwrap_or(Value::Null),
938                provider_executed: true,
939            },
940        },
941        Some("image") | Some("file") => file_part(
942            session_id,
943            message_id,
944            ordinal,
945            value,
946            Provenance::Conversational,
947        ),
948        // Same rationale as `user_part`'s fallback: lossless encoding of
949        // an unrecognised structured shape, not synthesised data.
950        _ => text_part(
951            session_id,
952            message_id,
953            ordinal,
954            Some(extract_compact_repr(value)),
955            Provenance::Conversational,
956        ),
957    }
958}
959
960fn tool_result_part(
961    session_id: &str,
962    message_id: &str,
963    ordinal: usize,
964    value: &Value,
965    source_tool_result: Option<&Value>,
966    state: &FileState,
967) -> Part {
968    let call_id = extract_str(value, "tool_use_id");
969    // `tool_result` source rows don't carry the tool name; it's resolved
970    // via the per-file `tool_use_id -> name` map. Misses (compaction pruned
971    // the originating `tool_use`) surface as `None` per spec.md#model-no-synthesis
972    // (schema-honesty: the field is `Option<Extracted<T>>`, not a fabricated
973    // string).
974    let name = value
975        .str_field("tool_use_id")
976        .and_then(|id| state.tool_call_names.get(id))
977        .cloned();
978    let result = value
979        .get("content")
980        .cloned()
981        .or_else(|| source_tool_result.cloned())
982        .unwrap_or(Value::Null);
983    Part {
984        session_id: session_id.to_owned(),
985        id: part_id(message_id, ordinal),
986        message_id: message_id.to_owned(),
987        ordinal: part_ordinal(ordinal),
988        // spec.md#model-part-provenance: tool output is runtime-produced, not
989        // conversation.
990        provenance: Provenance::Injected,
991        options: empty_options(),
992        kind: PartKind::ToolResult {
993            call_id,
994            name,
995            is_failure: value
996                .get("is_error")
997                .and_then(Value::as_bool)
998                .unwrap_or(false),
999            result,
1000        },
1001    }
1002}
1003
1004fn file_part(
1005    session_id: &str,
1006    message_id: &str,
1007    ordinal: usize,
1008    value: &Value,
1009    provenance: Provenance,
1010) -> Part {
1011    let media_type = value
1012        .get("media_type")
1013        .or_else(|| value.get("mime_type"))
1014        .and_then(Value::as_str)
1015        .map(ToOwned::to_owned);
1016    let file_name = value
1017        .get("file_name")
1018        .or_else(|| value.get("name"))
1019        .and_then(Value::as_str)
1020        .map(ToOwned::to_owned);
1021    let data = if let Some(source) = value.get("source") {
1022        if let Some(url) = source.get("url").and_then(Value::as_str) {
1023            FileData::Url(url.to_owned())
1024        } else if let Some(bytes) = source.get("data").and_then(Value::as_str) {
1025            FileData::String(bytes.to_owned())
1026        } else {
1027            FileData::String(compact_json(source))
1028        }
1029    } else if let Some(url) = value.get("url").and_then(Value::as_str) {
1030        FileData::Url(url.to_owned())
1031    } else {
1032        FileData::String(compact_json(value))
1033    };
1034
1035    Part {
1036        session_id: session_id.to_owned(),
1037        id: part_id(message_id, ordinal),
1038        message_id: message_id.to_owned(),
1039        ordinal: part_ordinal(ordinal),
1040        provenance,
1041        options: empty_options(),
1042        kind: PartKind::File {
1043            media_type,
1044            file_name,
1045            data,
1046        },
1047    }
1048}
1049
1050fn row_options(row: &Value, line: usize) -> ProviderOptions {
1051    let mut options = ProviderOptions::new();
1052    let source = json!({
1053        "line": line,
1054        "parent_uuid": row.get("parentUuid"),
1055        "is_sidechain": row.get("isSidechain"),
1056        "user_type": row.get("userType"),
1057        "entrypoint": row.get("entrypoint"),
1058        "cwd": row.get("cwd"),
1059        "version": row.get("version"),
1060        "git_branch": row.get("gitBranch"),
1061        "request_id": row.get("requestId"),
1062        "raw_type": row.get("type"),
1063        "raw_record": extract_raw_record(row),
1064    });
1065    options.insert("source".to_owned(), source);
1066    options
1067}
1068
1069fn assistant_options(row: &Value, message_value: &Value, line: usize) -> ProviderOptions {
1070    let mut options = row_options(row, line);
1071    let anthropic = json!({
1072        "id": message_value.get("id"),
1073        "model": message_value.get("model"),
1074        "stop_reason": message_value.get("stop_reason"),
1075        "stop_sequence": message_value.get("stop_sequence"),
1076        "usage": message_value.get("usage"),
1077    });
1078    options.insert("anthropic".to_owned(), anthropic);
1079    options
1080}
1081
1082fn signature_options(value: &Value) -> ProviderOptions {
1083    let mut options = ProviderOptions::new();
1084    if let Some(signature) = value.get("signature").and_then(Value::as_str) {
1085        options.insert("anthropic".to_owned(), json!({"signature": signature}));
1086    }
1087    options
1088}
1089
1090fn attachment_content(value: &Value) -> Option<Extracted<String>> {
1091    extract_str(value, "content").or_else(|| extract_str(value, "stdout"))
1092}
1093
1094fn parse_timestamp(value: &Value) -> anyhow::Result<DateTime<Utc>> {
1095    let timestamp = value
1096        .get("timestamp")
1097        .and_then(Value::as_str)
1098        .context("missing timestamp")?;
1099    Ok(DateTime::parse_from_rfc3339(timestamp)
1100        .context("invalid timestamp")?
1101        .with_timezone(&Utc))
1102}
1103
1104fn is_tool_result(value: &Value) -> bool {
1105    value.get("type").and_then(Value::as_str) == Some("tool_result")
1106}
1107
1108/// True when the row carries `isMeta: true` - claude-code's marker for an
1109/// expanded skill or command body injected into a user slot.
1110fn is_meta_row(row: &Value) -> bool {
1111    row.get("isMeta").and_then(Value::as_bool) == Some(true)
1112}
1113
1114/// Harness-injected wrappers claude-code places inside a user-slot turn
1115/// (spec.md#model-part-provenance): task notifications, slash-command echoes,
1116/// local-command caveats, interrupt notices.
1117fn is_injected_user_text(text: &str) -> bool {
1118    let trimmed = text.trim_start();
1119    trimmed.starts_with("<task-notification>")
1120        || trimmed.starts_with("<command-name>")
1121        || trimmed.starts_with("<command-message>")
1122        || trimmed.starts_with("<command-args>")
1123        || trimmed.starts_with("<local-command-caveat>")
1124        || trimmed.starts_with("<local-command-stdout>")
1125        || trimmed.starts_with("[Request interrupted by user")
1126}
1127
1128/// Provenance of a string-content user message: `injected` for an `isMeta`
1129/// row or a harness wrapper, `conversational` for a genuine human prompt.
1130fn user_text_provenance(row: &Value, text: &str) -> Provenance {
1131    if is_meta_row(row) || is_injected_user_text(text) {
1132        Provenance::Injected
1133    } else {
1134        Provenance::Conversational
1135    }
1136}
1137
1138/// Provenance of an array-content user message. `isMeta` flags the whole row;
1139/// otherwise a leading text item carrying a harness wrapper marks it injected.
1140/// v1 claude-code never interleaves both within one message.
1141fn user_array_provenance(row: &Value, items: &[Value]) -> Provenance {
1142    if is_meta_row(row) {
1143        return Provenance::Injected;
1144    }
1145    let wrapped = items.iter().any(|item| {
1146        item.get("type").and_then(Value::as_str) == Some("text")
1147            && item
1148                .get("text")
1149                .and_then(Value::as_str)
1150                .is_some_and(is_injected_user_text)
1151    });
1152    if wrapped {
1153        Provenance::Injected
1154    } else {
1155        Provenance::Conversational
1156    }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161    //! Conformance tests for the claude-code adapter's data-shape contract:
1162    //! subagent path derivation, replay dedup, tool-name resolution, and the
1163    //! "no synthesized values" invariant (spec.md#model-no-synthesis, spec.md#model-schema-honesty, and spec.md#model-lossless-projection).
1164    //!
1165    //! Each test builds a tiny synthetic corpus under a `TempDir` so the
1166    //! assertions exercise the real adapter end-to-end without depending on
1167    //! committed fixtures.
1168    #![allow(clippy::expect_used, clippy::unwrap_used)]
1169
1170    use super::*;
1171    use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
1172    use tempfile::TempDir;
1173
1174    // Manifest-dir anchored: unit tests must not depend on the process cwd
1175    // (figment::Jail chdirs the whole test process while config tests run).
1176    const FIXTURE_ROOT: &str = concat!(
1177        env!("CARGO_MANIFEST_DIR"),
1178        "/tests/fixtures/adapter/claude_code/projects"
1179    );
1180
1181    #[test]
1182    fn probe_default_finds_claude_projects_under_home() -> anyhow::Result<()> {
1183        crate::adapter::test_support::assert_probe_default(
1184            &ClaudeCodeFactory,
1185            &[".claude", "projects"],
1186        )
1187    }
1188
1189    #[tokio::test(flavor = "multi_thread")]
1190    async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
1191        let adapter = ClaudeCodeAdapter::new(FIXTURE_ROOT);
1192        crate::adapter::test_support::assert_native_restore(
1193            &ClaudeCodeFactory,
1194            &adapter,
1195            std::path::Path::new(FIXTURE_ROOT),
1196        )
1197        .await
1198    }
1199
1200    /// `<root>/<encoded-cwd>/<parent_uuid>.jsonl` plus
1201    /// `<root>/<encoded-cwd>/<parent_uuid>/subagents/agent-<hash>.jsonl` plus
1202    /// `agent-<hash>.meta.json`. The subagent file must:
1203    ///   - emit a Session whose `id = "{parent_uuid}/agent-{hash}"`
1204    ///   - have `parent_session_id = Some(parent_uuid)`
1205    ///   - have `source_agent = "claude-code/{agentType}"` from the meta file
1206    ///   - have `options.subagent` carrying the hash + agent_type + description
1207    #[tokio::test(flavor = "multi_thread")]
1208    async fn subagent_file_derives_child_session_with_parent_link() -> anyhow::Result<()> {
1209        let corpus = TempDir::new()?;
1210        let project_dir = corpus.path().join("-tmp-pond-test");
1211        let parent_uuid = "11111111-1111-1111-1111-111111111111";
1212        let agent_hash = "abc123def456";
1213        std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1214
1215        // Parent session file (one user row to anchor a Session).
1216        let parent_row = serde_json::json!({
1217            "type": "user",
1218            "uuid": "u-parent-1",
1219            "sessionId": parent_uuid,
1220            "cwd": "/tmp/pond-test",
1221            "timestamp": "2026-05-16T00:00:00.000Z",
1222            "version": "2.1.121",
1223            "message": {"role": "user", "content": "hi parent"},
1224        });
1225        std::fs::write(
1226            project_dir.join(format!("{parent_uuid}.jsonl")),
1227            format!("{parent_row}\n"),
1228        )?;
1229
1230        // Subagent file + sibling meta. Carries the SAME sessionId as the parent
1231        // in row content; the adapter must derive a child id from the path.
1232        let subagent_row = serde_json::json!({
1233            "type": "user",
1234            "uuid": "u-sub-1",
1235            "sessionId": parent_uuid,
1236            "cwd": "/tmp/pond-test",
1237            "isSidechain": true,
1238            "agentId": agent_hash,
1239            "timestamp": "2026-05-16T00:01:00.000Z",
1240            "version": "2.1.121",
1241            "message": {"role": "user", "content": "subagent prompt"},
1242        });
1243        std::fs::write(
1244            project_dir
1245                .join(parent_uuid)
1246                .join("subagents")
1247                .join(format!("agent-{agent_hash}.jsonl")),
1248            format!("{subagent_row}\n"),
1249        )?;
1250        std::fs::write(
1251            project_dir
1252                .join(parent_uuid)
1253                .join("subagents")
1254                .join(format!("agent-{agent_hash}.meta.json")),
1255            r#"{"agentType":"general-purpose","description":"do a thing"}"#,
1256        )?;
1257
1258        let store_dir = TempDir::new()?;
1259        let store = Store::open_local(store_dir.path()).await?;
1260        let adapter = ClaudeCodeAdapter::new(corpus.path());
1261
1262        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1263        assert_eq!(
1264            summary.dropped_sessions, 0,
1265            "subagent file must NOT collide with parent (pre-fix this was the project-immutable rejection)"
1266        );
1267
1268        let parent = store
1269            .get_session(parent_uuid)
1270            .await?
1271            .expect("parent session should ingest as the bare uuid");
1272        assert_eq!(parent.session.source_agent, "claude-code");
1273        assert_eq!(parent.session.parent_session_id, None);
1274
1275        let child_id = format!("{parent_uuid}/agent-{agent_hash}");
1276        let child = store
1277            .get_session(&child_id)
1278            .await?
1279            .expect("subagent session must surface under the derived id");
1280        assert_eq!(
1281            child.session.source_agent, "claude-code/general-purpose",
1282            "agent_type from .meta.json should suffix the source_agent label"
1283        );
1284        assert_eq!(
1285            child.session.parent_session_id.as_deref(),
1286            Some(parent_uuid),
1287            "subagent must link back to parent via parent_session_id",
1288        );
1289        let subagent_meta = child
1290            .session
1291            .options
1292            .get("subagent")
1293            .expect("options.subagent must carry the hash + verbatim meta.json");
1294        assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1295        assert_eq!(
1296            subagent_meta["meta"]["agentType"],
1297            serde_json::json!("general-purpose")
1298        );
1299        assert_eq!(
1300            subagent_meta["meta"]["description"],
1301            serde_json::json!("do a thing")
1302        );
1303        Ok(())
1304    }
1305
1306    /// Subagent file present but the sibling `.meta.json` is missing. The
1307    /// adapter must still derive a child session (so it doesn't collide with
1308    /// the parent) and fall back to `source_agent = "claude-code/subagent"`.
1309    #[tokio::test(flavor = "multi_thread")]
1310    async fn subagent_without_meta_falls_back_to_generic_label() -> anyhow::Result<()> {
1311        let corpus = TempDir::new()?;
1312        let project_dir = corpus.path().join("-tmp-pond-test");
1313        let parent_uuid = "22222222-2222-2222-2222-222222222222";
1314        let agent_hash = "deadbeef";
1315        std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1316        let row = serde_json::json!({
1317            "type": "user",
1318            "uuid": "u-sub-only",
1319            "sessionId": parent_uuid,
1320            "cwd": "/tmp/pond-test",
1321            "timestamp": "2026-05-16T00:00:00.000Z",
1322            "message": {"role": "user", "content": "no meta sibling here"},
1323        });
1324        std::fs::write(
1325            project_dir
1326                .join(parent_uuid)
1327                .join("subagents")
1328                .join(format!("agent-{agent_hash}.jsonl")),
1329            format!("{row}\n"),
1330        )?;
1331
1332        let store_dir = TempDir::new()?;
1333        let store = Store::open_local(store_dir.path()).await?;
1334        let adapter = ClaudeCodeAdapter::new(corpus.path());
1335        let _summary =
1336            ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1337
1338        let child = store
1339            .get_session(&format!("{parent_uuid}/agent-{agent_hash}"))
1340            .await?
1341            .expect("derived child id even without meta");
1342        assert_eq!(child.session.source_agent, "claude-code/subagent");
1343        Ok(())
1344    }
1345
1346    /// Nested workflow-runner subagent:
1347    ///   `<parent_uuid>/subagents/workflows/<wf-id>/agent-<hash>.jsonl`.
1348    /// Same parent `sessionId` in row content AND a shifted `cwd`. The adapter
1349    /// must derive a distinct child id from the FULL path under `subagents/`
1350    /// (not collapse onto the parent), so it neither collides on the immutable
1351    /// `project` nor silently merges into the parent. Regression for the
1352    /// workflow-layout sync rejection. See spec.md#datasets.
1353    #[tokio::test(flavor = "multi_thread")]
1354    async fn workflow_nested_subagent_derives_distinct_child_not_parent_collision()
1355    -> anyhow::Result<()> {
1356        let corpus = TempDir::new()?;
1357        let project_dir = corpus.path().join("-tmp-pond-test");
1358        let parent_uuid = "44444444-4444-4444-4444-444444444444";
1359        let wf_id = "wf_abcd1234-ef0";
1360        let agent_hash = "cafef00dbaadf00d1";
1361        let wf_dir = project_dir
1362            .join(parent_uuid)
1363            .join("subagents")
1364            .join("workflows")
1365            .join(wf_id);
1366        std::fs::create_dir_all(&wf_dir)?;
1367
1368        let parent_row = serde_json::json!({
1369            "type": "user",
1370            "uuid": "u-parent-1",
1371            "sessionId": parent_uuid,
1372            "cwd": "/tmp/pond-test",
1373            "timestamp": "2026-05-20T00:00:00.000Z",
1374            "message": {"role": "user", "content": "hi parent"},
1375        });
1376        std::fs::write(
1377            project_dir.join(format!("{parent_uuid}.jsonl")),
1378            format!("{parent_row}\n"),
1379        )?;
1380
1381        // Shifted cwd: pre-fix this collided with the parent's immutable project.
1382        let subagent_row = serde_json::json!({
1383            "type": "user",
1384            "uuid": "u-wf-sub-1",
1385            "sessionId": parent_uuid,
1386            "cwd": "/tmp/pond-test/packages/sub",
1387            "isSidechain": true,
1388            "agentId": agent_hash,
1389            "timestamp": "2026-05-20T00:01:00.000Z",
1390            "message": {"role": "user", "content": "workflow subagent prompt"},
1391        });
1392        std::fs::write(
1393            wf_dir.join(format!("agent-{agent_hash}.jsonl")),
1394            format!("{subagent_row}\n"),
1395        )?;
1396        std::fs::write(
1397            wf_dir.join(format!("agent-{agent_hash}.meta.json")),
1398            r#"{"agentType":"general-purpose","description":"workflow child"}"#,
1399        )?;
1400
1401        let store_dir = TempDir::new()?;
1402        let store = Store::open_local(store_dir.path()).await?;
1403        let adapter = ClaudeCodeAdapter::new(corpus.path());
1404        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1405        assert_eq!(
1406            summary.dropped_sessions, 0,
1407            "nested workflow subagent must NOT collide with the parent project",
1408        );
1409
1410        let parent = store
1411            .get_session(parent_uuid)
1412            .await?
1413            .expect("parent session ingests under the bare uuid");
1414        assert_eq!(&*parent.session.project, "/tmp/pond-test");
1415        assert_eq!(parent.session.parent_session_id, None);
1416
1417        let child_id = format!("{parent_uuid}/workflows/{wf_id}/agent-{agent_hash}");
1418        let child = store
1419            .get_session(&child_id)
1420            .await?
1421            .expect("workflow subagent surfaces under the full nested child id");
1422        assert_eq!(child.session.source_agent, "claude-code/general-purpose");
1423        assert_eq!(
1424            child.session.parent_session_id.as_deref(),
1425            Some(parent_uuid)
1426        );
1427        assert_eq!(
1428            &*child.session.project, "/tmp/pond-test/packages/sub",
1429            "child keeps its own cwd-derived project, distinct from the parent",
1430        );
1431        let subagent_meta = child
1432            .session
1433            .options
1434            .get("subagent")
1435            .expect("options.subagent present");
1436        assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1437        Ok(())
1438    }
1439
1440    /// A `.jsonl` under `subagents/` whose leaf is NOT `agent-<hash>.jsonl` (a
1441    /// layout this pond version doesn't understand) must FAIL VISIBLY rather
1442    /// than fall back to its content `sessionId` (the parent's) and silently
1443    /// merge into the parent session. It is counted as an unsupported skip and
1444    /// contributes no rows. See spec.md#datasets.
1445    #[tokio::test(flavor = "multi_thread")]
1446    async fn unrecognized_subagents_file_fails_visibly_not_merged() -> anyhow::Result<()> {
1447        let corpus = TempDir::new()?;
1448        let project_dir = corpus.path().join("-tmp-pond-test");
1449        let parent_uuid = "55555555-5555-5555-5555-555555555555";
1450        let unknown_dir = project_dir
1451            .join(parent_uuid)
1452            .join("subagents")
1453            .join("workflows")
1454            .join("wf_future01-aaa");
1455        std::fs::create_dir_all(&unknown_dir)?;
1456
1457        let parent_row = serde_json::json!({
1458            "type": "user",
1459            "uuid": "u-parent-only",
1460            "sessionId": parent_uuid,
1461            "cwd": "/tmp/pond-test",
1462            "timestamp": "2026-05-20T00:00:00.000Z",
1463            "message": {"role": "user", "content": "parent message"},
1464        });
1465        std::fs::write(
1466            project_dir.join(format!("{parent_uuid}.jsonl")),
1467            format!("{parent_row}\n"),
1468        )?;
1469
1470        // Same parent sessionId AND same cwd: pre-guard this would have merged
1471        // silently into the parent. The leaf name is not `agent-<hash>.jsonl`.
1472        let unknown_row = serde_json::json!({
1473            "type": "user",
1474            "uuid": "u-should-not-merge",
1475            "sessionId": parent_uuid,
1476            "cwd": "/tmp/pond-test",
1477            "timestamp": "2026-05-20T00:02:00.000Z",
1478            "message": {"role": "user", "content": "must not land under parent"},
1479        });
1480        std::fs::write(
1481            unknown_dir.join("transcript-001.jsonl"),
1482            format!("{unknown_row}\n"),
1483        )?;
1484
1485        let store_dir = TempDir::new()?;
1486        let store = Store::open_local(store_dir.path()).await?;
1487        let adapter = ClaudeCodeAdapter::new(corpus.path());
1488        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1489
1490        assert_eq!(
1491            summary.skipped_files, 1,
1492            "the unrecognized subagents/ transcript must be a visible, counted skip",
1493        );
1494        let parent = store
1495            .get_session(parent_uuid)
1496            .await?
1497            .expect("parent session ingests");
1498        assert_eq!(
1499            parent.messages.len(),
1500            1,
1501            "the unrecognized file's row must NOT be merged into the parent session",
1502        );
1503        assert!(
1504            parent
1505                .messages
1506                .iter()
1507                .all(|m| m.message.id() != "u-should-not-merge"),
1508            "parent must not absorb the unrecognized file's message",
1509        );
1510        Ok(())
1511    }
1512
1513    /// Re-sync visibility: an unrecognized `subagents/` file must STILL surface as
1514    /// a visible `Unsupported` skip when the parent already carries a freshness
1515    /// watermark. Its content `sessionId` is the parent's, so peeking it would let
1516    /// the freshness gate skip the file as `Fresh` under the parent's watermark and
1517    /// hide the failure. `peek_session_id` returns `None` for it instead, keeping
1518    /// it out of the gate. Regression for the re-sync visibility leak. See
1519    /// spec.md#datasets.
1520    #[tokio::test(flavor = "multi_thread")]
1521    async fn unrecognized_subagents_file_stays_visible_under_parent_watermark() -> anyhow::Result<()>
1522    {
1523        struct ParentAlreadyFresh;
1524        impl crate::adapter::SkipOracle for ParentAlreadyFresh {
1525            fn last_ingested_at(&self, _session_id: &str) -> Option<DateTime<Utc>> {
1526                // Far-future watermark: every source mtime is `<= ingested`, so a
1527                // peeked id WOULD trip the freshness gate. The guard must keep the
1528                // unrecognized file out of the gate regardless.
1529                Some(
1530                    DateTime::parse_from_rfc3339("2999-01-01T00:00:00Z")
1531                        .unwrap()
1532                        .with_timezone(&Utc),
1533                )
1534            }
1535            fn is_empty(&self) -> bool {
1536                false
1537            }
1538        }
1539
1540        let corpus = TempDir::new()?;
1541        let project_dir = corpus.path().join("-tmp-pond-test");
1542        let parent_uuid = "66666666-6666-6666-6666-666666666666";
1543        let unknown_dir = project_dir
1544            .join(parent_uuid)
1545            .join("subagents")
1546            .join("workflows")
1547            .join("wf_future02-bbb");
1548        std::fs::create_dir_all(&unknown_dir)?;
1549
1550        let parent_row = serde_json::json!({
1551            "type": "user",
1552            "uuid": "u-parent-fresh",
1553            "sessionId": parent_uuid,
1554            "cwd": "/tmp/pond-test",
1555            "timestamp": "2026-05-20T00:00:00.000Z",
1556            "message": {"role": "user", "content": "parent message"},
1557        });
1558        std::fs::write(
1559            project_dir.join(format!("{parent_uuid}.jsonl")),
1560            format!("{parent_row}\n"),
1561        )?;
1562
1563        // Same parent sessionId, leaf not `agent-<hash>.jsonl`: pre-fix this would
1564        // peek the parent's id and be fresh-skipped under the far-future watermark.
1565        let unknown_row = serde_json::json!({
1566            "type": "user",
1567            "uuid": "u-resync-should-stay-visible",
1568            "sessionId": parent_uuid,
1569            "cwd": "/tmp/pond-test",
1570            "timestamp": "2026-05-20T00:02:00.000Z",
1571            "message": {"role": "user", "content": "must stay visible"},
1572        });
1573        std::fs::write(
1574            unknown_dir.join("transcript-002.jsonl"),
1575            format!("{unknown_row}\n"),
1576        )?;
1577
1578        let store_dir = TempDir::new()?;
1579        let store = Store::open_local(store_dir.path()).await?;
1580        let adapter = ClaudeCodeAdapter::new(corpus.path());
1581        let summary = ingest_adapter(&store, &adapter, &ParentAlreadyFresh, |_| {}).await?;
1582
1583        assert_eq!(
1584            summary.skipped_files, 1,
1585            "the unrecognized transcript must stay a visible Unsupported skip, not be fresh-skipped under the parent's watermark",
1586        );
1587        // The parent file legitimately fresh-skips under the far-future watermark;
1588        // the unrecognized file must NOT join it (pre-fix `skipped_fresh` would be 2).
1589        assert_eq!(
1590            summary.skipped_fresh, 1,
1591            "only the parent may fresh-skip; the unrecognized file must not borrow its watermark",
1592        );
1593        Ok(())
1594    }
1595
1596    /// Three rows with the same `uuid` (the claude-code `/resume` replay
1597    /// pattern). The adapter must dedupe at the file-state level so the
1598    /// validator never sees the duplicates; `dropped_events` stays 0 and
1599    /// `inserted` covers the single canonical row.
1600    #[tokio::test(flavor = "multi_thread")]
1601    async fn replay_duplicates_are_dedup_at_adapter_layer() -> anyhow::Result<()> {
1602        let corpus = TempDir::new()?;
1603        let project_dir = corpus.path().join("-tmp-pond-test");
1604        std::fs::create_dir_all(&project_dir)?;
1605        let session_uuid = "33333333-3333-3333-3333-333333333333";
1606        let dup_uuid = "u-shared-1";
1607        let row = serde_json::json!({
1608            "type": "user",
1609            "uuid": dup_uuid,
1610            "sessionId": session_uuid,
1611            "cwd": "/tmp/pond-test",
1612            "timestamp": "2026-05-16T00:00:00.000Z",
1613            "message": {"role": "user", "content": "replayed three times"},
1614        });
1615        // Three identical rows back-to-back, same uuid.
1616        let body = format!("{row}\n{row}\n{row}\n");
1617        std::fs::write(project_dir.join(format!("{session_uuid}.jsonl")), body)?;
1618
1619        let store_dir = TempDir::new()?;
1620        let store = Store::open_local(store_dir.path()).await?;
1621        let adapter = ClaudeCodeAdapter::new(corpus.path());
1622        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1623
1624        assert_eq!(
1625            summary.dropped_events, 0,
1626            "adapter must dedupe replays before they reach the validator"
1627        );
1628        assert!(
1629            !summary
1630                .drop_reasons
1631                .contains_key(crate::sessions::DROP_REASON_DUPLICATE_MESSAGE_ID),
1632            "duplicate_message_id bucket stays empty when adapter does its job"
1633        );
1634        Ok(())
1635    }
1636
1637    /// One assistant `tool_use` followed by a user `tool_result` in the same
1638    /// file. The adapter's per-file `tool_use_id -> name` map must resolve the
1639    /// result's tool name to the call's name. Pre-fix: synthesized `"unknown"`.
1640    #[tokio::test(flavor = "multi_thread")]
1641    async fn tool_result_name_resolves_from_prior_tool_use_in_same_file() -> anyhow::Result<()> {
1642        let corpus = TempDir::new()?;
1643        let project_dir = corpus.path().join("-tmp-pond-test");
1644        std::fs::create_dir_all(&project_dir)?;
1645        let session_uuid = "44444444-4444-4444-4444-444444444444";
1646        let call_id = "toolu_test_01";
1647
1648        let tool_use_row = serde_json::json!({
1649            "type": "assistant",
1650            "uuid": "u-call",
1651            "sessionId": session_uuid,
1652            "cwd": "/tmp/pond-test",
1653            "timestamp": "2026-05-16T00:00:00.000Z",
1654            "message": {
1655                "role": "assistant",
1656                "content": [{
1657                    "type": "tool_use",
1658                    "id": call_id,
1659                    "name": "Edit",
1660                    "input": {"file_path": "/tmp/foo"},
1661                }],
1662            },
1663        });
1664        let tool_result_row = serde_json::json!({
1665            "type": "user",
1666            "uuid": "u-result",
1667            "sessionId": session_uuid,
1668            "cwd": "/tmp/pond-test",
1669            "timestamp": "2026-05-16T00:00:01.000Z",
1670            "message": {
1671                "role": "user",
1672                "content": [{
1673                    "type": "tool_result",
1674                    "tool_use_id": call_id,
1675                    "content": "ok",
1676                }],
1677            },
1678        });
1679        std::fs::write(
1680            project_dir.join(format!("{session_uuid}.jsonl")),
1681            format!("{tool_use_row}\n{tool_result_row}\n"),
1682        )?;
1683
1684        let store_dir = TempDir::new()?;
1685        let store = Store::open_local(store_dir.path()).await?;
1686        let adapter = ClaudeCodeAdapter::new(corpus.path());
1687        let _summary =
1688            ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1689        let session = store
1690            .get_session(session_uuid)
1691            .await?
1692            .expect("session ingests");
1693
1694        let mut saw_call = false;
1695        let mut saw_result = false;
1696        for stored in &session.messages {
1697            for part in &stored.parts {
1698                match &part.kind {
1699                    PartKind::ToolCall {
1700                        call_id: cid, name, ..
1701                    } => {
1702                        assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1703                        assert_eq!(
1704                            name.as_ref().map(|e| e.as_str()),
1705                            Some("Edit"),
1706                            "tool_use carries the name directly"
1707                        );
1708                        saw_call = true;
1709                    }
1710                    PartKind::ToolResult {
1711                        call_id: cid, name, ..
1712                    } => {
1713                        assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1714                        assert_eq!(
1715                            name.as_ref().map(|e| e.as_str()),
1716                            Some("Edit"),
1717                            "tool_result resolves the name via the per-file map (was 'unknown' pre-2026-05-16)"
1718                        );
1719                        saw_result = true;
1720                    }
1721                    _ => {}
1722                }
1723            }
1724        }
1725        assert!(saw_call && saw_result, "both parts must be present");
1726        Ok(())
1727    }
1728
1729    /// spec.md#model-part-provenance: a genuine human prompt classifies
1730    /// `conversational`; a harness `<task-notification>` user-slot turn and an
1731    /// `isMeta` row classify `injected`.
1732    #[test]
1733    fn user_text_provenance_separates_prompts_from_harness_injection() {
1734        let prompt = json!({"type": "user", "uuid": "u1"});
1735        assert_eq!(
1736            user_text_provenance(&prompt, "please refactor the parser"),
1737            Provenance::Conversational,
1738        );
1739
1740        let notification = json!({"type": "user", "uuid": "u2"});
1741        assert_eq!(
1742            user_text_provenance(
1743                &notification,
1744                "<task-notification>background task done</task-notification>",
1745            ),
1746            Provenance::Injected,
1747        );
1748
1749        let meta = json!({"type": "user", "uuid": "u3", "isMeta": true});
1750        assert_eq!(
1751            user_text_provenance(&meta, "expanded skill body"),
1752            Provenance::Injected,
1753        );
1754    }
1755
1756    /// Ingest a session carrying a `<task-notification>` user message and a
1757    /// genuine prompt; the notification's part must be `injected` and the
1758    /// prompt's `conversational` (spec.md#model-part-provenance).
1759    #[tokio::test(flavor = "multi_thread")]
1760    async fn task_notification_message_yields_injected_parts() -> anyhow::Result<()> {
1761        let corpus = TempDir::new()?;
1762        let project_dir = corpus.path().join("-tmp-pond-test");
1763        std::fs::create_dir_all(&project_dir)?;
1764        let session_uuid = "66666666-6666-6666-6666-666666666666";
1765        let prompt = serde_json::json!({
1766            "type": "user",
1767            "uuid": "u-prompt",
1768            "sessionId": session_uuid,
1769            "cwd": "/tmp/pond-test",
1770            "timestamp": "2026-05-16T00:00:00.000Z",
1771            "message": {"role": "user", "content": "genuine human prompt"},
1772        });
1773        let notification = serde_json::json!({
1774            "type": "user",
1775            "uuid": "u-notify",
1776            "sessionId": session_uuid,
1777            "cwd": "/tmp/pond-test",
1778            "timestamp": "2026-05-16T00:00:01.000Z",
1779            "message": {
1780                "role": "user",
1781                "content": "<task-notification>a background task finished</task-notification>",
1782            },
1783        });
1784        std::fs::write(
1785            project_dir.join(format!("{session_uuid}.jsonl")),
1786            format!("{prompt}\n{notification}\n"),
1787        )?;
1788
1789        let store_dir = TempDir::new()?;
1790        let store = Store::open_local(store_dir.path()).await?;
1791        let adapter = ClaudeCodeAdapter::new(corpus.path());
1792        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1793
1794        let session = store
1795            .get_session(session_uuid)
1796            .await?
1797            .expect("session ingests");
1798        let mut saw_prompt = false;
1799        let mut saw_notification = false;
1800        for stored in &session.messages {
1801            for part in &stored.parts {
1802                if stored.message.id() == "u-prompt" {
1803                    assert_eq!(part.provenance, crate::wire::Provenance::Conversational);
1804                    saw_prompt = true;
1805                }
1806                if stored.message.id() == "u-notify" {
1807                    assert_eq!(part.provenance, crate::wire::Provenance::Injected);
1808                    saw_notification = true;
1809                }
1810            }
1811        }
1812        assert!(saw_prompt && saw_notification, "both messages present");
1813        Ok(())
1814    }
1815
1816    /// Orphan tool_result with no earlier tool_use in the same file: the
1817    /// per-file map can't resolve. The adapter must emit `name: None`, NOT
1818    /// the old `"unknown"` sentinel. Invariant 15 (no synthesized values).
1819    #[tokio::test(flavor = "multi_thread")]
1820    async fn orphan_tool_result_yields_name_none_not_unknown_sentinel() -> anyhow::Result<()> {
1821        let corpus = TempDir::new()?;
1822        let project_dir = corpus.path().join("-tmp-pond-test");
1823        std::fs::create_dir_all(&project_dir)?;
1824        let session_uuid = "55555555-5555-5555-5555-555555555555";
1825
1826        // tool_result with no earlier tool_use (simulates a compaction-pruned call).
1827        let row = serde_json::json!({
1828            "type": "user",
1829            "uuid": "u-orphan",
1830            "sessionId": session_uuid,
1831            "cwd": "/tmp/pond-test",
1832            "timestamp": "2026-05-16T00:00:00.000Z",
1833            "message": {
1834                "role": "user",
1835                "content": [{
1836                    "type": "tool_result",
1837                    "tool_use_id": "toolu_orphan",
1838                    "content": "result body, no matching call",
1839                }],
1840            },
1841        });
1842        std::fs::write(
1843            project_dir.join(format!("{session_uuid}.jsonl")),
1844            format!("{row}\n"),
1845        )?;
1846
1847        let store_dir = TempDir::new()?;
1848        let store = Store::open_local(store_dir.path()).await?;
1849        let adapter = ClaudeCodeAdapter::new(corpus.path());
1850        let _summary =
1851            ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1852        let session = store
1853            .get_session(session_uuid)
1854            .await?
1855            .expect("session ingests");
1856        let mut found = false;
1857        for stored in &session.messages {
1858            for part in &stored.parts {
1859                if let PartKind::ToolResult { name, call_id, .. } = &part.kind {
1860                    assert_eq!(call_id.as_ref().map(|e| e.as_str()), Some("toolu_orphan"));
1861                    assert!(
1862                        name.is_none(),
1863                        "orphan tool_result must be name=None, not synthesized 'unknown'",
1864                    );
1865                    found = true;
1866                }
1867            }
1868        }
1869        assert!(found, "orphan tool_result part must be present");
1870        // Sanity: even an orphan should not be reported as a drop.
1871        Ok(())
1872    }
1873
1874    /// The Workflow runner's `journal.jsonl` under `subagents/workflows/<wf>/`
1875    /// is a known control file, not an unrecognized transcript - it must NOT be
1876    /// flagged unsupported (it falls through to a benign Empty skip).
1877    #[test]
1878    fn workflow_journal_is_a_control_file_not_unsupported() {
1879        let adapter = ClaudeCodeAdapter::new("/tmp/pond-test-root");
1880        let journal = std::path::Path::new(
1881            "/root/-proj/55555555-5555-5555-5555-555555555555/subagents/workflows/wf_030e6487-da6/journal.jsonl",
1882        );
1883        assert!(is_workflow_control_file(journal));
1884        assert!(
1885            adapter.unsupported_reason(journal).is_none(),
1886            "journal.jsonl is a known control file, not an unsupported layout",
1887        );
1888    }
1889
1890    /// Regression guard against narrowing the net too far: a genuinely unknown
1891    /// leaf under `subagents/` is still flagged unsupported, while a recognized
1892    /// `agent-<hash>.jsonl` is not.
1893    #[test]
1894    fn unknown_subagents_leaf_is_still_unsupported() {
1895        let adapter = ClaudeCodeAdapter::new("/tmp/pond-test-root");
1896        let unknown = std::path::Path::new(
1897            "/root/-proj/PARENT/subagents/workflows/wf_x/transcript-001.jsonl",
1898        );
1899        assert!(
1900            adapter.unsupported_reason(unknown).is_some(),
1901            "an unrecognized non-agent, non-journal leaf must still fail visibly",
1902        );
1903        assert!(!is_workflow_control_file(unknown));
1904
1905        let agent = std::path::Path::new("/root/-proj/PARENT/subagents/agent-abc123def456.jsonl");
1906        assert!(
1907            adapter.unsupported_reason(agent).is_none(),
1908            "a recognized agent transcript is resolvable, not unsupported",
1909        );
1910    }
1911
1912    /// End-to-end: a workflow dir holding both a real `agent-<hash>.jsonl`
1913    /// transcript and the runner's `journal.jsonl`. The agent transcript
1914    /// ingests as a child session; the journal is a benign skip (no
1915    /// `skipped_files` failure) and its rows never merge into the parent.
1916    #[tokio::test(flavor = "multi_thread")]
1917    async fn workflow_journal_skipped_benignly_while_sibling_agent_ingests() -> anyhow::Result<()> {
1918        let corpus = TempDir::new()?;
1919        let project_dir = corpus.path().join("-tmp-pond-test");
1920        let parent_uuid = "77777777-7777-7777-7777-777777777777";
1921        let wf_id = "wf_030e6487-da6";
1922        let agent_hash = "a38f4724ef3864da8";
1923        let wf_dir = project_dir
1924            .join(parent_uuid)
1925            .join("subagents")
1926            .join("workflows")
1927            .join(wf_id);
1928        std::fs::create_dir_all(&wf_dir)?;
1929
1930        let parent_row = serde_json::json!({
1931            "type": "user",
1932            "uuid": "u-parent-1",
1933            "sessionId": parent_uuid,
1934            "cwd": "/tmp/pond-test",
1935            "timestamp": "2026-06-04T00:00:00.000Z",
1936            "message": {"role": "user", "content": "hi parent"},
1937        });
1938        std::fs::write(
1939            project_dir.join(format!("{parent_uuid}.jsonl")),
1940            format!("{parent_row}\n"),
1941        )?;
1942
1943        let agent_row = serde_json::json!({
1944            "type": "user",
1945            "uuid": "u-agent-1",
1946            "sessionId": parent_uuid,
1947            "cwd": "/tmp/pond-test",
1948            "timestamp": "2026-06-04T00:01:00.000Z",
1949            "message": {"role": "user", "content": "workflow agent prompt"},
1950        });
1951        std::fs::write(
1952            wf_dir.join(format!("agent-{agent_hash}.jsonl")),
1953            format!("{agent_row}\n"),
1954        )?;
1955
1956        // The Workflow journal: control events only, no sessionId.
1957        std::fs::write(
1958            wf_dir.join("journal.jsonl"),
1959            "{\"type\":\"started\",\"key\":\"v2:abc\",\"agentId\":\"a38f\"}\n\
1960             {\"type\":\"result\",\"key\":\"v2:abc\",\"agentId\":\"a38f\",\"result\":{}}\n",
1961        )?;
1962
1963        let store_dir = TempDir::new()?;
1964        let store = Store::open_local(store_dir.path()).await?;
1965        let adapter = ClaudeCodeAdapter::new(corpus.path());
1966        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1967        assert_eq!(
1968            summary.skipped_files, 0,
1969            "journal.jsonl is a control file (benign Empty skip), not an unsupported failure",
1970        );
1971
1972        let child = store
1973            .get_session(&format!(
1974                "{parent_uuid}/workflows/{wf_id}/agent-{agent_hash}"
1975            ))
1976            .await?
1977            .expect("the sibling agent transcript still ingests as a child session");
1978        assert_eq!(
1979            child.session.parent_session_id.as_deref(),
1980            Some(parent_uuid)
1981        );
1982
1983        let parent = store
1984            .get_session(parent_uuid)
1985            .await?
1986            .expect("parent session ingests");
1987        assert_eq!(
1988            parent.messages.len(),
1989            1,
1990            "journal rows must NOT merge into the parent session",
1991        );
1992        Ok(())
1993    }
1994
1995    /// Hardening: even a journal.jsonl whose rows DO carry the parent
1996    /// `sessionId` must not merge - the guard is structural, not contingent on
1997    /// the journal lacking one.
1998    #[tokio::test(flavor = "multi_thread")]
1999    async fn workflow_journal_with_parent_sessionid_still_not_merged() -> anyhow::Result<()> {
2000        let corpus = TempDir::new()?;
2001        let project_dir = corpus.path().join("-tmp-pond-test");
2002        let parent_uuid = "88888888-8888-8888-8888-888888888888";
2003        let wf_dir = project_dir
2004            .join(parent_uuid)
2005            .join("subagents")
2006            .join("workflows")
2007            .join("wf_abc01234-def");
2008        std::fs::create_dir_all(&wf_dir)?;
2009
2010        let parent_row = serde_json::json!({
2011            "type": "user",
2012            "uuid": "u-parent",
2013            "sessionId": parent_uuid,
2014            "cwd": "/tmp/pond-test",
2015            "timestamp": "2026-06-04T00:00:00.000Z",
2016            "message": {"role": "user", "content": "parent only"},
2017        });
2018        std::fs::write(
2019            project_dir.join(format!("{parent_uuid}.jsonl")),
2020            format!("{parent_row}\n"),
2021        )?;
2022
2023        // A journal carrying the PARENT sessionId (hypothetical future shape):
2024        // the structural guard must still refuse to merge it.
2025        let journal_row = serde_json::json!({
2026            "type": "started",
2027            "key": "v2:abc",
2028            "agentId": "a1",
2029            "sessionId": parent_uuid,
2030            "message": {"role": "user", "content": "must not merge"},
2031        });
2032        std::fs::write(wf_dir.join("journal.jsonl"), format!("{journal_row}\n"))?;
2033
2034        let store_dir = TempDir::new()?;
2035        let store = Store::open_local(store_dir.path()).await?;
2036        let adapter = ClaudeCodeAdapter::new(corpus.path());
2037        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
2038        assert_eq!(
2039            summary.skipped_files, 0,
2040            "journal is a benign Empty skip, not an unsupported failure",
2041        );
2042        let parent = store
2043            .get_session(parent_uuid)
2044            .await?
2045            .expect("parent session ingests");
2046        assert_eq!(
2047            parent.messages.len(),
2048            1,
2049            "journal row must NOT merge even when it carries the parent sessionId",
2050        );
2051        Ok(())
2052    }
2053}