Skip to main content

pond/adapter/
claude_code.rs

1//! Claude Code CLI adapter.
2//!
3//! Source path: `~/.claude/projects/<encoded-project-path>/<session-uuid>.jsonl`.
4//! Each `.jsonl` file is one session; lines are typed entries linked via a
5//! `parentUuid` -> `uuid` chain. Tool results arrive as `user` entries whose
6//! `message.content[]` contains `tool_result` blocks with a parallel
7//! `toolUseResult` field carrying structured data.
8
9use std::{
10    collections::{HashMap, HashSet},
11    path::{Path, PathBuf},
12};
13
14use anyhow::Context as _;
15use chrono::{DateTime, SecondsFormat, Utc};
16use serde_json::{Value, json};
17
18use crate::{
19    sessions::IngestEvent,
20    wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
21};
22
23use super::{
24    Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
25    RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
26    empty_options,
27    extract::{
28        Extracted, Source, extract_compact_repr, extract_raw_record, extract_self_str, extract_str,
29    },
30    extracted_text,
31    jsonl::{BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events, source_line},
32    jsonl_bytes, part_id, part_ordinal, raw_record,
33};
34
35/// Per-file streaming state that persists across rows of one JSONL file.
36/// Lives inside [`Adapter::events`]'s per-file loop and is reset whenever
37/// the loop advances to the next file.
38///
39/// Two responsibilities:
40///
41/// 1. **Replay dedup.** Claude Code's `/resume` and `/compact` paths
42///    occasionally re-emit byte-identical rows with the same `uuid` (the
43///    stale-`messageSet`-cache bug in claude-code, see
44///    `utils/sessionStorage.ts`). The adapter dedupes here so the validator
45///    never sees the same PK twice; validator HashSet stays a safety net.
46///
47/// 2. **`tool_use_id -> tool name` resolution.** The raw `tool_result` row
48///    carries only `tool_use_id`, not the tool name; the name lives on the
49///    prior `tool_use` row in the same file. We populate this map when we
50///    see a `tool_use` part, then look it up when we see the matching
51///    `tool_result` part. Misses (e.g. compaction pruned the tool_use)
52///    surface as `name: None` in `PartKind::ToolResult` rather than the
53///    old `"unknown"` sentinel - faithful to the source rather than
54///    inventing a value.
55#[derive(Debug, Default)]
56pub(crate) struct FileState {
57    seen_uuids: HashSet<String>,
58    tool_call_names: HashMap<String, Extracted<String>>,
59}
60
61/// Stable adapter name. Surfaces as the `[sources.claude-code]` config key,
62/// the `pond sync claude-code` CLI arg, and `Session.source_agent` on every
63/// emitted row.
64const NAME: &str = "claude-code";
65
66/// Stateless factory: opens [`ClaudeCodeAdapter`] instances from config and
67/// probes for the canonical install location under `~/.claude/projects`.
68pub struct ClaudeCodeFactory;
69
70impl AdapterFactory for ClaudeCodeFactory {
71    fn name(&self) -> &'static str {
72        NAME
73    }
74
75    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
76        Ok(Box::new(ClaudeCodeAdapter::new(config_path(NAME, config)?)))
77    }
78
79    fn probe_default(&self, env: &Env) -> Option<Value> {
80        let path = env.home.join(".claude").join("projects");
81        path.exists().then(|| json!({ "path": path }))
82    }
83
84    fn serialize(
85        &self,
86        session: &crate::sessions::SessionWithMessages,
87        fidelity: RestoreFidelity,
88    ) -> Result<Vec<RestoredFile>, AdapterError> {
89        serialize_session(session, fidelity)
90    }
91}
92
93fn serialize_session(
94    session: &crate::sessions::SessionWithMessages,
95    fidelity: RestoreFidelity,
96) -> Result<Vec<RestoredFile>, AdapterError> {
97    let mut messages = session.messages.clone();
98    if fidelity == RestoreFidelity::Native {
99        messages.sort_by(|left, right| {
100            source_line(left.message.options())
101                .cmp(&source_line(right.message.options()))
102                .then_with(|| by_timestamp_then_id(left, right))
103        });
104    } else {
105        messages.sort_by(by_timestamp_then_id);
106    }
107    // Native replays the verbatim `options.source.raw_record`; `claude_record`
108    // below is foreign-only. Replay echoes a frozen snapshot - safe only while
109    // canonical is append-only (spec.md#adapter-integrity-additive-sync).
110    let mut records = Vec::with_capacity(messages.len());
111    let mut parent_uuid = None::<String>;
112    for message in &messages {
113        if fidelity == RestoreFidelity::Native
114            && let Some(raw) = raw_record(message.message.options())
115        {
116            parent_uuid = raw
117                .get("uuid")
118                .and_then(Value::as_str)
119                .map(ToOwned::to_owned)
120                .or(parent_uuid);
121            records.push(raw);
122            continue;
123        }
124        // `claude_record` returns `None` for a dropped System message;
125        // `parent_uuid` then stays put so the chain skips over the gap.
126        let Some(record) = claude_record(session, message, parent_uuid.as_deref()) else {
127            continue;
128        };
129        parent_uuid = record
130            .get("uuid")
131            .and_then(Value::as_str)
132            .map(ToOwned::to_owned);
133        records.push(record);
134    }
135
136    let mut files = vec![RestoredFile::new(
137        claude_relative_path(session),
138        jsonl_bytes(NAME, &records)?,
139        fidelity,
140    )];
141    if session.session.parent_session_id.is_some()
142        && let Some(meta) = subagent_meta_record(session)
143    {
144        let mut meta_path = files[0].relative_path.clone();
145        meta_path.set_extension("meta.json");
146        files.push(RestoredFile::new(
147            meta_path,
148            serde_json::to_vec(&meta).map_err(|err| {
149                AdapterError::schema(
150                    NAME,
151                    &session.session.id,
152                    format!("json encode failed: {err}"),
153                )
154            })?,
155            fidelity,
156        ));
157    }
158    Ok(files)
159}
160
161fn claude_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
162    let encoded_project = session
163        .session
164        .options
165        .get("source")
166        .and_then(|source| source.get("project_dir"))
167        .and_then(Value::as_str)
168        .map(ToOwned::to_owned)
169        .unwrap_or_else(|| encode_project(&session.session.project));
170    if let Some(parent) = &session.session.parent_session_id {
171        // The child id is `<parent>/<child_suffix>`; the suffix is the file's
172        // path under `subagents/` (`agent-<hash>` flat, or
173        // `workflows/<wf-id>/agent-<hash>` nested), so stripping the parent
174        // prefix reconstructs the on-disk path verbatim.
175        let child_suffix = session
176            .session
177            .id
178            .strip_prefix(&format!("{parent}/"))
179            .unwrap_or(&session.session.id);
180        return PathBuf::from(encoded_project)
181            .join(parent)
182            .join("subagents")
183            .join(format!("{child_suffix}.jsonl"));
184    }
185    PathBuf::from(encoded_project).join(format!("{}.jsonl", session.session.id))
186}
187
188fn encode_project(project: &str) -> String {
189    project.replace(['/', '.'], "-")
190}
191
192fn subagent_meta_record(session: &crate::sessions::SessionWithMessages) -> Option<Value> {
193    // Restore the sidecar `.meta.json` verbatim from the stored copy. A
194    // subagent ingested without a meta file stored `meta: null` - nothing
195    // to write back.
196    let meta = session.session.options.get("subagent")?.get("meta")?;
197    meta.is_object().then(|| meta.clone())
198}
199
200fn claude_record(
201    session: &crate::sessions::SessionWithMessages,
202    message: &crate::sessions::MessageWithParts,
203    parent_uuid: Option<&str>,
204) -> Option<Value> {
205    // Foreign restore into Claude Code (native restore re-emits the stored
206    // `raw_record` and never reaches here). Claude Code's transcript has only
207    // `user` and `assistant` rows: a tool result is a `user` row, and there
208    // is no in-transcript system turn - a System message (a rule-3 carrier or
209    // a source's own system/developer turn) has no idiomatic home and is
210    // dropped; the content stays in canonical (spec.md#adapter-native-restore-lossless,
211    // foreign clause).
212    let row_role = match &message.message {
213        Message::System { .. } => return None,
214        Message::User { .. } | Message::Tool { .. } => "user",
215        Message::Assistant { .. } => "assistant",
216    };
217    let mut envelope = serde_json::Map::new();
218    envelope.insert("role".to_owned(), Value::String(row_role.to_owned()));
219    if row_role == "assistant" {
220        // `type:"message"` is the Anthropic Messages API object discriminator
221        // - a constant, always present on a real assistant row.
222        envelope.insert("type".to_owned(), Value::String("message".to_owned()));
223    }
224    envelope.insert(
225        "content".to_owned(),
226        Value::Array(message.parts.iter().map(claude_part).collect()),
227    );
228    Some(json!({
229        "parentUuid": parent_uuid,
230        "isSidechain": false,
231        "userType": "external",
232        "cwd": &*session.session.project,
233        "sessionId": &session.session.id,
234        "type": row_role,
235        "message": Value::Object(envelope),
236        "uuid": message.message.id(),
237        "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
238    }))
239}
240
241fn claude_part(part: &Part) -> Value {
242    match &part.kind {
243        PartKind::Text { text } => json!({"type": "text", "text": extracted_text(text)}),
244        PartKind::Reasoning { text } => {
245            json!({"type": "thinking", "thinking": extracted_text(text)})
246        }
247        PartKind::ToolCall {
248            call_id,
249            name,
250            params,
251            provider_executed,
252        } => json!({
253            "type": if *provider_executed { "server_tool_use" } else { "tool_use" },
254            "id": extracted_text(call_id),
255            "name": extracted_text(name),
256            "input": params,
257        }),
258        PartKind::ToolResult {
259            call_id,
260            is_failure,
261            result,
262            ..
263        } => json!({
264            "type": "tool_result",
265            "tool_use_id": extracted_text(call_id),
266            "is_error": is_failure,
267            "content": result,
268        }),
269        PartKind::File {
270            media_type,
271            file_name,
272            data,
273        } => json!({
274            "type": "file",
275            "media_type": media_type,
276            "file_name": file_name,
277            "source": file_source(data),
278        }),
279        other => {
280            json!({"type": "text", "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null))})
281        }
282    }
283}
284
285fn file_source(data: &FileData) -> Value {
286    match data {
287        FileData::String(value) => json!({"type": "text", "data": value}),
288        FileData::Bytes(value) => json!({"type": "base64", "data": value}),
289        FileData::Url(value) => json!({"type": "url", "url": value}),
290    }
291}
292
293/// Configured claude-code reader. Walks a tree of `*.jsonl` files under
294/// [`Self::root`] and yields canonical events in source order per session.
295#[derive(Debug, Clone)]
296pub struct ClaudeCodeAdapter {
297    root: PathBuf,
298}
299
300impl ClaudeCodeAdapter {
301    pub fn new(root: impl Into<PathBuf>) -> Self {
302        Self { root: root.into() }
303    }
304}
305
306impl Adapter for ClaudeCodeAdapter {
307    fn discover(&self) -> DiscoverFuture<'_> {
308        jsonl_tree_discover(self)
309    }
310
311    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
312        jsonl_tree_events(self, oracle)
313    }
314}
315
316impl JsonlTree for ClaudeCodeAdapter {
317    type State = FileState;
318
319    fn name(&self) -> &'static str {
320        NAME
321    }
322
323    fn root(&self) -> &Path {
324        &self.root
325    }
326
327    fn peek_session_id(&self, path: &Path, first_line: &str) -> Option<String> {
328        // A file under `subagents/` takes its id from the path, never from the
329        // row's content `sessionId` (that's the parent's). A recognized child
330        // peeks to its child id; an unrecognized one returns `None` so it stays
331        // out of the freshness gate and its `unsupported_reason` failure
332        // re-surfaces on every sync rather than being skipped as `Fresh` under
333        // the parent's borrowed watermark. See spec.md#datasets.
334        if subagents_dir(path).is_some() {
335            let (parent_uuid, child_suffix, _) = subagent_ids(path)?;
336            return Some(format!("{parent_uuid}/{child_suffix}"));
337        }
338        let row: Value = serde_json::from_str(first_line).ok()?;
339        row.get("sessionId")?.as_str().map(ToOwned::to_owned)
340    }
341
342    fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
343        session_from_rows(path, rows)
344    }
345
346    fn events_from_row(
347        &self,
348        session: &Session,
349        row: &BoundedRow,
350        state: &mut Self::State,
351    ) -> Result<Vec<IngestEvent>, String> {
352        if let Some(uuid) = row.value.get("uuid").and_then(Value::as_str)
353            && !state.seen_uuids.insert(uuid.to_owned())
354        {
355            return Ok(Vec::new());
356        }
357        capture_tool_call_names(&row.value, &mut state.tool_call_names);
358        events_from_row(&session.id, row.line, &row.value, session.created_at, state)
359    }
360
361    fn unsupported_reason(&self, path: &Path) -> Option<String> {
362        // A `.jsonl` under a `subagents/` ancestor that we can't resolve to a
363        // child id (its leaf isn't `agent-<hash>.jsonl`) must NOT fall back to
364        // its content `sessionId` - that id is the parent's, so it would
365        // silently merge into the parent session. Fail visibly and wait for an
366        // adapter update instead. See spec.md#datasets.
367        if subagents_dir(path).is_some() && subagent_ids(path).is_none() {
368            return Some(format!(
369                "{}: subagent transcript layout not recognized by this pond version; \
370                 skipped so it is not merged into the parent session - update pond and \
371                 re-run `pond sync`",
372                path.display()
373            ));
374        }
375        None
376    }
377}
378
379/// Walk one raw row's `message.content[]` array (if any) and stash every
380/// `tool_use` part's `id -> name` mapping into the per-file map. Idempotent
381/// and safe to call on every row regardless of role; non-assistant rows
382/// just don't contribute entries.
383fn capture_tool_call_names(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
384    let Some(items) = row
385        .get("message")
386        .and_then(|message| message.get("content"))
387        .and_then(Value::as_array)
388    else {
389        return;
390    };
391    for item in items {
392        let kind = item.get("type").and_then(Value::as_str);
393        if !matches!(kind, Some("tool_use") | Some("server_tool_use")) {
394            continue;
395        }
396        let (Some(id), Some(name)) = (item.str_field("id"), extract_str(item, "name")) else {
397            continue;
398        };
399        map.insert(id.to_owned(), name);
400    }
401}
402
403fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
404    let path_display = path.display().to_string();
405    let mut created_at = None;
406    let mut project: Option<Extracted<String>> = None;
407    let mut version = None;
408    for row in rows {
409        if created_at.is_none() {
410            created_at = parse_timestamp(&row.value).ok();
411        }
412        if project.is_none() {
413            project = extract_str(&row.value, "cwd");
414        }
415        if version.is_none() {
416            version = row
417                .value
418                .get("version")
419                .and_then(Value::as_str)
420                .map(ToOwned::to_owned);
421        }
422    }
423
424    let first = rows
425        .first()
426        .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
427    let at_first = format!("{path_display}:{}", first.line);
428    let raw_session_id = first
429        .value
430        .get("sessionId")
431        .and_then(Value::as_str)
432        .ok_or_else(|| {
433            AdapterError::schema(
434                NAME,
435                at_first.clone(),
436                format!("line {} missing sessionId", first.line),
437            )
438        })?
439        .to_owned();
440    let created_at = created_at.ok_or_else(|| {
441        AdapterError::schema(NAME, at_first, "session has no parseable timestamp")
442    })?;
443
444    // Subagent detection. Claude Code stores each subagent's transcript under
445    // the session's `subagents/` sidecar - either flat
446    // (`<parent_dir>/<parent_uuid>/subagents/agent-<hash>.jsonl`) or, for the
447    // workflow runner, nested
448    // (`.../subagents/workflows/<wf-id>/agent-<hash>.jsonl`) - with a sibling
449    // `agent-<hash>.meta.json` carrying `{agentType, description}`. Every such
450    // file shares the parent's `sessionId` in row content, so ingesting it under
451    // that id collides with the parent (the validator's "project is immutable"
452    // rule rejects a cwd-shifted one, and a same-cwd one silently merges). The
453    // fix is to derive a child id from the path - keyed off the `subagents/`
454    // ancestor at any depth - and link back via `parent_session_id`. See
455    // spec.md#datasets.
456    let subagent = subagent_descriptor(path);
457    let project_dir = source_project_dir(path, subagent.is_some());
458    let (session_id, parent_session_id, source_agent, subagent_options) = match subagent {
459        Some(SubagentDescriptor {
460            parent_uuid,
461            child_suffix,
462            agent_hash,
463            agent_type,
464            meta,
465        }) => {
466            let child_id = format!("{parent_uuid}/{child_suffix}");
467            let agent_label = agent_type
468                .as_deref()
469                .map(|t| format!("claude-code/{t}"))
470                .unwrap_or_else(|| "claude-code/subagent".to_owned());
471            // `meta` is the verbatim `.meta.json`; `hash` and `raw_session_id`
472            // are pond-derived (filename hash + parent sessionId). Storing the
473            // whole meta keeps native restore of the sidecar lossless.
474            let metadata = json!({
475                "hash": agent_hash,
476                "raw_session_id": raw_session_id,
477                "meta": meta,
478            });
479            (child_id, Some(parent_uuid), agent_label, Some(metadata))
480        }
481        None => (raw_session_id, None, "claude-code".to_owned(), None),
482    };
483
484    let project = match project {
485        Some(value) => value,
486        None => {
487            let decoded = path
488                .parent()
489                .and_then(|p| p.file_name())
490                .and_then(|n| n.to_str())
491                .map(|s| s.replace('-', "/"))
492                .ok_or_else(|| {
493                    AdapterError::schema(
494                        NAME,
495                        path_display.clone(),
496                        "no `cwd` field in any row and source path is not UTF-8",
497                    )
498                })?;
499            extract_self_str(&Value::String(decoded)).ok_or_else(|| {
500                AdapterError::schema(
501                    NAME,
502                    path_display.clone(),
503                    "internal: Value::String produced None from Source::as_str",
504                )
505            })?
506        }
507    };
508
509    let mut options = ProviderOptions::new();
510    options.insert(
511        "source".to_owned(),
512        json!({
513            "adapter": "claude-code",
514            "version": version,
515            "project_dir": project_dir,
516            "workspace_path": &*project,
517        }),
518    );
519    if let Some(metadata) = subagent_options {
520        options.insert("subagent".to_owned(), metadata);
521    }
522
523    Ok(Session {
524        id: session_id,
525        parent_session_id,
526        parent_message_id: None,
527        source_agent,
528        created_at,
529        project,
530        options,
531    })
532}
533
534fn source_project_dir(path: &Path, is_subagent: bool) -> Option<String> {
535    // The project dir is the grandparent of `subagents/` regardless of how
536    // deeply the transcript nests below it (`.../<project>/<parent_uuid>/
537    // subagents/...`), so climb from the `subagents/` ancestor rather than a
538    // fixed number of `.parent()` hops.
539    let project_dir = if is_subagent {
540        subagents_dir(path)?.parent()?.parent()
541    } else {
542        path.parent()
543    };
544    project_dir
545        .and_then(|p| p.file_name())
546        .and_then(|n| n.to_str())
547        .map(ToOwned::to_owned)
548}
549
550/// The `subagents/` directory in `path`'s ancestry, if any. Depth-independent:
551/// matches both the flat `<parent_uuid>/subagents/agent-<hash>.jsonl` and the
552/// nested workflow `<parent_uuid>/subagents/workflows/<wf-id>/agent-<hash>.jsonl`
553/// layouts. The directory directly above it is the parent session uuid.
554fn subagents_dir(path: &Path) -> Option<&Path> {
555    let mut cur = path.parent();
556    while let Some(dir) = cur {
557        if dir.file_name().and_then(|n| n.to_str()) == Some("subagents") {
558            return Some(dir);
559        }
560        cur = dir.parent();
561    }
562    None
563}
564
565/// Resolved metadata for one subagent JSONL file. `agent_type` is read from
566/// the sibling `.meta.json` for the `source_agent` label; `meta` keeps that
567/// file's full verbatim content so native restore reproduces it
568/// (spec.md#adapter-native-restore-lossless). Both are `None` when the meta file is
569/// absent or unreadable (the label falls back to `claude-code/subagent`).
570struct SubagentDescriptor {
571    parent_uuid: String,
572    child_suffix: String,
573    agent_hash: String,
574    agent_type: Option<String>,
575    meta: Option<Value>,
576}
577
578/// `(parent_uuid, child_suffix, agent_hash)` for a subagent transcript, or
579/// `None` for any path without a `subagents/` ancestor or a non-`agent-<hash>`
580/// leaf (the common case: top-level session files). `child_suffix` is the file's
581/// path relative to its `subagents/` ancestor with `.jsonl` stripped -
582/// `agent-<hash>` flat, `workflows/<wf-id>/agent-<hash>` nested - so the derived
583/// child id `<parent_uuid>/<child_suffix>` round-trips back to the on-disk path
584/// on native restore. `agent_hash` keys the sibling `.meta.json` lookup.
585fn subagent_ids(path: &Path) -> Option<(String, String, String)> {
586    let file_name = path.file_name()?.to_str()?;
587    let agent_hash = file_name
588        .strip_prefix("agent-")?
589        .strip_suffix(".jsonl")?
590        .to_owned();
591    let subagents = subagents_dir(path)?;
592    let parent_uuid = subagents.parent()?.file_name()?.to_str()?.to_owned();
593    // The child id must be `/`-canonical on every platform (the rest of the
594    // adapter and `claude_relative_path` assume `/`), but a relative path carries
595    // the OS separator - normalize it. No-op on POSIX.
596    let child_suffix = path
597        .strip_prefix(subagents)
598        .ok()?
599        .with_extension("")
600        .to_str()?
601        .replace(std::path::MAIN_SEPARATOR, "/");
602    Some((parent_uuid, child_suffix, agent_hash))
603}
604
605/// [`subagent_ids`] plus the sibling `agent-<hash>.meta.json` - `agentType` for
606/// the `source_agent` label, the whole file for lossless sidecar restore.
607fn subagent_descriptor(path: &Path) -> Option<SubagentDescriptor> {
608    let (parent_uuid, child_suffix, agent_hash) = subagent_ids(path)?;
609    let meta_path = path.parent()?.join(format!("agent-{agent_hash}.meta.json"));
610    let (agent_type, meta) = match std::fs::read(&meta_path) {
611        Ok(bytes) => match serde_json::from_slice::<Value>(&bytes) {
612            Ok(value) => (
613                value
614                    .get("agentType")
615                    .and_then(Value::as_str)
616                    .map(ToOwned::to_owned),
617                Some(value),
618            ),
619            Err(error) => {
620                tracing::debug!(
621                    target: "pond::adapter::claude_code",
622                    meta = %meta_path.display(),
623                    %error,
624                    "subagent .meta.json present but unparseable; falling back to 'claude-code/subagent'",
625                );
626                (None, None)
627            }
628        },
629        Err(error) if error.kind() == std::io::ErrorKind::NotFound => (None, None),
630        Err(error) => {
631            tracing::debug!(
632                target: "pond::adapter::claude_code",
633                meta = %meta_path.display(),
634                %error,
635                "subagent .meta.json IO error; falling back to 'claude-code/subagent'",
636            );
637            (None, None)
638        }
639    };
640
641    Some(SubagentDescriptor {
642        parent_uuid,
643        child_suffix,
644        agent_hash,
645        agent_type,
646        meta,
647    })
648}
649
650fn events_from_row(
651    session_id: &str,
652    line: usize,
653    row: &Value,
654    default_timestamp: DateTime<Utc>,
655    state: &FileState,
656) -> Result<Vec<IngestEvent>, String> {
657    let timestamp = parse_timestamp(row).unwrap_or(default_timestamp);
658    let uuid = row
659        .get("uuid")
660        .and_then(Value::as_str)
661        .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
662
663    if let Some(message_value) = row.get("message") {
664        return message_events(
665            session_id,
666            &uuid,
667            timestamp,
668            row,
669            message_value,
670            state,
671            line,
672        );
673    }
674
675    // Rows with no `message` field are session-metadata records:
676    // `queue-operation`, `permission-mode`, `last-prompt`, `attachment`,
677    // `progress`, `system`, `custom-title`, etc. We preserve them as
678    // System messages with the row's compact JSON in `content` so a future
679    // exporter could reconstruct the original transcript; the `subtype`
680    // becomes the human label via `options.source.raw_type`.
681    let raw_type = row.get("type").and_then(Value::as_str);
682    let content = if raw_type == Some("attachment") {
683        row.get("attachment")
684            .and_then(attachment_content)
685            .or_else(|| Some(extract_compact_repr(row)))
686    } else {
687        extract_str(row, "subtype").or_else(|| extract_str(row, "type"))
688    };
689    let message = Message::System {
690        id: uuid,
691        session_id: session_id.to_owned(),
692        timestamp,
693        content,
694        options: row_options(row, line),
695    };
696    Ok(vec![IngestEvent::Message(message)])
697}
698
699fn message_events(
700    session_id: &str,
701    uuid: &str,
702    timestamp: DateTime<Utc>,
703    row: &Value,
704    message_value: &Value,
705    state: &FileState,
706    line: usize,
707) -> Result<Vec<IngestEvent>, String> {
708    let role = message_value
709        .get("role")
710        .and_then(Value::as_str)
711        .ok_or_else(|| "message missing role".to_owned())?;
712    let content = message_value.get("content").unwrap_or(&Value::Null);
713    let mut parts = Vec::new();
714    let message = match (role, content) {
715        ("user", Value::String(text)) => {
716            // spec.md#model-part-provenance: a user-slot turn is conversation only
717            // when it is a genuine human prompt; harness-injected wrappers and
718            // `isMeta` rows are scaffolding.
719            let provenance = user_text_provenance(row, text);
720            parts.push(text_part(
721                session_id,
722                uuid,
723                0,
724                extract_self_str(content),
725                provenance,
726            ));
727            Message::User {
728                id: uuid.to_owned(),
729                session_id: session_id.to_owned(),
730                timestamp,
731                options: row_options(row, line),
732            }
733        }
734        ("user", Value::Array(items)) if items.iter().all(is_tool_result) => {
735            let source_tool_result = row.get("toolUseResult").cloned();
736            parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
737                tool_result_part(
738                    session_id,
739                    uuid,
740                    ordinal,
741                    item,
742                    source_tool_result.as_ref(),
743                    state,
744                )
745            }));
746            Message::Tool {
747                id: uuid.to_owned(),
748                session_id: session_id.to_owned(),
749                timestamp,
750                options: row_options(row, line),
751            }
752        }
753        ("user", Value::Array(items)) => {
754            // Classify the whole user message once: v1 claude-code never mixes
755            // provenance within a single message (spec.md#model-part-provenance).
756            let provenance = user_array_provenance(row, items);
757            parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
758                user_part(session_id, uuid, ordinal, item, state, provenance)
759            }));
760            Message::User {
761                id: uuid.to_owned(),
762                session_id: session_id.to_owned(),
763                timestamp,
764                options: row_options(row, line),
765            }
766        }
767        ("assistant", Value::Array(items)) => {
768            parts.extend(
769                items
770                    .iter()
771                    .enumerate()
772                    .map(|(ordinal, item)| assistant_part(session_id, uuid, ordinal, item)),
773            );
774            Message::Assistant {
775                id: uuid.to_owned(),
776                session_id: session_id.to_owned(),
777                timestamp,
778                options: assistant_options(row, message_value, line),
779            }
780        }
781        ("system", Value::String(_)) => Message::System {
782            id: uuid.to_owned(),
783            session_id: session_id.to_owned(),
784            timestamp,
785            content: extract_self_str(content),
786            options: row_options(row, line),
787        },
788        ("system", _) => Message::System {
789            id: uuid.to_owned(),
790            session_id: session_id.to_owned(),
791            timestamp,
792            // Fallback for system messages without a string content: serialize
793            // the structured body as JSON. This is not a synthesized value
794            // (the row genuinely had this content), just a lossless string
795            // encoding of structured data.
796            content: Some(extract_compact_repr(message_value)),
797            options: row_options(row, line),
798        },
799        (other, _) => {
800            return Err(format!("unsupported message role {other}"));
801        }
802    };
803
804    let mut events = Vec::with_capacity(parts.len() + 1);
805    events.push(IngestEvent::Message(message));
806    events.extend(parts.into_iter().map(IngestEvent::Part));
807    Ok(events)
808}
809
810fn text_part(
811    session_id: &str,
812    message_id: &str,
813    ordinal: usize,
814    text: Option<Extracted<String>>,
815    provenance: Provenance,
816) -> Part {
817    Part {
818        session_id: session_id.to_owned(),
819        id: part_id(message_id, ordinal),
820        message_id: message_id.to_owned(),
821        ordinal: part_ordinal(ordinal),
822        provenance,
823        options: empty_options(),
824        kind: PartKind::Text { text },
825    }
826}
827
828fn user_part(
829    session_id: &str,
830    message_id: &str,
831    ordinal: usize,
832    value: &Value,
833    state: &FileState,
834    provenance: Provenance,
835) -> Part {
836    match value.get("type").and_then(Value::as_str) {
837        Some("text") => text_part(
838            session_id,
839            message_id,
840            ordinal,
841            extract_str(value, "text"),
842            provenance,
843        ),
844        Some("image") | Some("file") => {
845            file_part(session_id, message_id, ordinal, value, provenance)
846        }
847        Some("tool_result") => {
848            tool_result_part(session_id, message_id, ordinal, value, None, state)
849        }
850        // Unknown user part shapes: preserve the raw JSON in the Text slot
851        // rather than dropping. This is not a synthesized value - it's a
852        // lossless encoding of structured data the schema doesn't model.
853        _ => text_part(
854            session_id,
855            message_id,
856            ordinal,
857            Some(extract_compact_repr(value)),
858            provenance,
859        ),
860    }
861}
862
863fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, value: &Value) -> Part {
864    // spec.md#model-part-provenance: assistant content - text, reasoning, tool calls -
865    // is model-authored, hence conversational. `tool_result` parts never appear
866    // on an assistant message.
867    match value.get("type").and_then(Value::as_str) {
868        Some("text") => text_part(
869            session_id,
870            message_id,
871            ordinal,
872            extract_str(value, "text"),
873            Provenance::Conversational,
874        ),
875        Some("thinking") => Part {
876            session_id: session_id.to_owned(),
877            id: part_id(message_id, ordinal),
878            message_id: message_id.to_owned(),
879            ordinal: part_ordinal(ordinal),
880            provenance: Provenance::Conversational,
881            options: signature_options(value),
882            kind: PartKind::Reasoning {
883                text: extract_str(value, "thinking"),
884            },
885        },
886        Some("tool_use") => Part {
887            session_id: session_id.to_owned(),
888            id: part_id(message_id, ordinal),
889            message_id: message_id.to_owned(),
890            ordinal: part_ordinal(ordinal),
891            provenance: Provenance::Conversational,
892            options: empty_options(),
893            kind: PartKind::ToolCall {
894                call_id: extract_str(value, "id"),
895                name: extract_str(value, "name"),
896                params: value.get("input").cloned().unwrap_or(Value::Null),
897                provider_executed: false,
898            },
899        },
900        Some("server_tool_use") => Part {
901            session_id: session_id.to_owned(),
902            id: part_id(message_id, ordinal),
903            message_id: message_id.to_owned(),
904            ordinal: part_ordinal(ordinal),
905            provenance: Provenance::Conversational,
906            options: empty_options(),
907            kind: PartKind::ToolCall {
908                call_id: extract_str(value, "id"),
909                name: extract_str(value, "name"),
910                params: value.get("input").cloned().unwrap_or(Value::Null),
911                provider_executed: true,
912            },
913        },
914        Some("image") | Some("file") => file_part(
915            session_id,
916            message_id,
917            ordinal,
918            value,
919            Provenance::Conversational,
920        ),
921        // Same rationale as `user_part`'s fallback: lossless encoding of
922        // an unrecognised structured shape, not synthesised data.
923        _ => text_part(
924            session_id,
925            message_id,
926            ordinal,
927            Some(extract_compact_repr(value)),
928            Provenance::Conversational,
929        ),
930    }
931}
932
933fn tool_result_part(
934    session_id: &str,
935    message_id: &str,
936    ordinal: usize,
937    value: &Value,
938    source_tool_result: Option<&Value>,
939    state: &FileState,
940) -> Part {
941    let call_id = extract_str(value, "tool_use_id");
942    // `tool_result` source rows don't carry the tool name; it's resolved
943    // via the per-file `tool_use_id -> name` map. Misses (compaction pruned
944    // the originating `tool_use`) surface as `None` per spec.md#model-no-synthesis
945    // (schema-honesty: the field is `Option<Extracted<T>>`, not a fabricated
946    // string).
947    let name = value
948        .str_field("tool_use_id")
949        .and_then(|id| state.tool_call_names.get(id))
950        .cloned();
951    let result = value
952        .get("content")
953        .cloned()
954        .or_else(|| source_tool_result.cloned())
955        .unwrap_or(Value::Null);
956    Part {
957        session_id: session_id.to_owned(),
958        id: part_id(message_id, ordinal),
959        message_id: message_id.to_owned(),
960        ordinal: part_ordinal(ordinal),
961        // spec.md#model-part-provenance: tool output is runtime-produced, not
962        // conversation.
963        provenance: Provenance::Injected,
964        options: empty_options(),
965        kind: PartKind::ToolResult {
966            call_id,
967            name,
968            is_failure: value
969                .get("is_error")
970                .and_then(Value::as_bool)
971                .unwrap_or(false),
972            result,
973        },
974    }
975}
976
977fn file_part(
978    session_id: &str,
979    message_id: &str,
980    ordinal: usize,
981    value: &Value,
982    provenance: Provenance,
983) -> Part {
984    let media_type = value
985        .get("media_type")
986        .or_else(|| value.get("mime_type"))
987        .and_then(Value::as_str)
988        .map(ToOwned::to_owned);
989    let file_name = value
990        .get("file_name")
991        .or_else(|| value.get("name"))
992        .and_then(Value::as_str)
993        .map(ToOwned::to_owned);
994    let data = if let Some(source) = value.get("source") {
995        if let Some(url) = source.get("url").and_then(Value::as_str) {
996            FileData::Url(url.to_owned())
997        } else if let Some(bytes) = source.get("data").and_then(Value::as_str) {
998            FileData::String(bytes.to_owned())
999        } else {
1000            FileData::String(compact_json(source))
1001        }
1002    } else if let Some(url) = value.get("url").and_then(Value::as_str) {
1003        FileData::Url(url.to_owned())
1004    } else {
1005        FileData::String(compact_json(value))
1006    };
1007
1008    Part {
1009        session_id: session_id.to_owned(),
1010        id: part_id(message_id, ordinal),
1011        message_id: message_id.to_owned(),
1012        ordinal: part_ordinal(ordinal),
1013        provenance,
1014        options: empty_options(),
1015        kind: PartKind::File {
1016            media_type,
1017            file_name,
1018            data,
1019        },
1020    }
1021}
1022
1023fn row_options(row: &Value, line: usize) -> ProviderOptions {
1024    let mut options = ProviderOptions::new();
1025    let source = json!({
1026        "line": line,
1027        "parent_uuid": row.get("parentUuid"),
1028        "is_sidechain": row.get("isSidechain"),
1029        "user_type": row.get("userType"),
1030        "entrypoint": row.get("entrypoint"),
1031        "cwd": row.get("cwd"),
1032        "version": row.get("version"),
1033        "git_branch": row.get("gitBranch"),
1034        "request_id": row.get("requestId"),
1035        "raw_type": row.get("type"),
1036        "raw_record": extract_raw_record(row),
1037    });
1038    options.insert("source".to_owned(), source);
1039    options
1040}
1041
1042fn assistant_options(row: &Value, message_value: &Value, line: usize) -> ProviderOptions {
1043    let mut options = row_options(row, line);
1044    let anthropic = json!({
1045        "id": message_value.get("id"),
1046        "model": message_value.get("model"),
1047        "stop_reason": message_value.get("stop_reason"),
1048        "stop_sequence": message_value.get("stop_sequence"),
1049        "usage": message_value.get("usage"),
1050    });
1051    options.insert("anthropic".to_owned(), anthropic);
1052    options
1053}
1054
1055fn signature_options(value: &Value) -> ProviderOptions {
1056    let mut options = ProviderOptions::new();
1057    if let Some(signature) = value.get("signature").and_then(Value::as_str) {
1058        options.insert("anthropic".to_owned(), json!({"signature": signature}));
1059    }
1060    options
1061}
1062
1063fn attachment_content(value: &Value) -> Option<Extracted<String>> {
1064    extract_str(value, "content").or_else(|| extract_str(value, "stdout"))
1065}
1066
1067fn parse_timestamp(value: &Value) -> anyhow::Result<DateTime<Utc>> {
1068    let timestamp = value
1069        .get("timestamp")
1070        .and_then(Value::as_str)
1071        .context("missing timestamp")?;
1072    Ok(DateTime::parse_from_rfc3339(timestamp)
1073        .context("invalid timestamp")?
1074        .with_timezone(&Utc))
1075}
1076
1077fn is_tool_result(value: &Value) -> bool {
1078    value.get("type").and_then(Value::as_str) == Some("tool_result")
1079}
1080
1081/// True when the row carries `isMeta: true` - claude-code's marker for an
1082/// expanded skill or command body injected into a user slot.
1083fn is_meta_row(row: &Value) -> bool {
1084    row.get("isMeta").and_then(Value::as_bool) == Some(true)
1085}
1086
1087/// Harness-injected wrappers claude-code places inside a user-slot turn
1088/// (spec.md#model-part-provenance): task notifications, slash-command echoes,
1089/// local-command caveats, interrupt notices.
1090fn is_injected_user_text(text: &str) -> bool {
1091    let trimmed = text.trim_start();
1092    trimmed.starts_with("<task-notification>")
1093        || trimmed.starts_with("<command-name>")
1094        || trimmed.starts_with("<command-message>")
1095        || trimmed.starts_with("<command-args>")
1096        || trimmed.starts_with("<local-command-caveat>")
1097        || trimmed.starts_with("<local-command-stdout>")
1098        || trimmed.starts_with("[Request interrupted by user")
1099}
1100
1101/// Provenance of a string-content user message: `injected` for an `isMeta`
1102/// row or a harness wrapper, `conversational` for a genuine human prompt.
1103fn user_text_provenance(row: &Value, text: &str) -> Provenance {
1104    if is_meta_row(row) || is_injected_user_text(text) {
1105        Provenance::Injected
1106    } else {
1107        Provenance::Conversational
1108    }
1109}
1110
1111/// Provenance of an array-content user message. `isMeta` flags the whole row;
1112/// otherwise a leading text item carrying a harness wrapper marks it injected.
1113/// v1 claude-code never interleaves both within one message.
1114fn user_array_provenance(row: &Value, items: &[Value]) -> Provenance {
1115    if is_meta_row(row) {
1116        return Provenance::Injected;
1117    }
1118    let wrapped = items.iter().any(|item| {
1119        item.get("type").and_then(Value::as_str) == Some("text")
1120            && item
1121                .get("text")
1122                .and_then(Value::as_str)
1123                .is_some_and(is_injected_user_text)
1124    });
1125    if wrapped {
1126        Provenance::Injected
1127    } else {
1128        Provenance::Conversational
1129    }
1130}
1131
1132#[cfg(test)]
1133mod tests {
1134    //! Conformance tests for the claude-code adapter's data-shape contract:
1135    //! subagent path derivation, replay dedup, tool-name resolution, and the
1136    //! "no synthesized values" invariant (spec.md#model-no-synthesis, spec.md#model-schema-honesty, and spec.md#model-lossless-projection).
1137    //!
1138    //! Each test builds a tiny synthetic corpus under a `TempDir` so the
1139    //! assertions exercise the real adapter end-to-end without depending on
1140    //! committed fixtures.
1141    #![allow(clippy::expect_used, clippy::unwrap_used)]
1142
1143    use super::*;
1144    use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
1145    use tempfile::TempDir;
1146
1147    const FIXTURE_ROOT: &str = "tests/fixtures/adapter/claude_code/projects";
1148
1149    #[test]
1150    fn probe_default_finds_claude_projects_under_home() -> anyhow::Result<()> {
1151        crate::adapter::test_support::assert_probe_default(
1152            &ClaudeCodeFactory,
1153            &[".claude", "projects"],
1154        )
1155    }
1156
1157    #[tokio::test(flavor = "multi_thread")]
1158    async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
1159        let adapter = ClaudeCodeAdapter::new(FIXTURE_ROOT);
1160        crate::adapter::test_support::assert_native_restore(
1161            &ClaudeCodeFactory,
1162            &adapter,
1163            std::path::Path::new(FIXTURE_ROOT),
1164        )
1165        .await
1166    }
1167
1168    /// `<root>/<encoded-cwd>/<parent_uuid>.jsonl` plus
1169    /// `<root>/<encoded-cwd>/<parent_uuid>/subagents/agent-<hash>.jsonl` plus
1170    /// `agent-<hash>.meta.json`. The subagent file must:
1171    ///   - emit a Session whose `id = "{parent_uuid}/agent-{hash}"`
1172    ///   - have `parent_session_id = Some(parent_uuid)`
1173    ///   - have `source_agent = "claude-code/{agentType}"` from the meta file
1174    ///   - have `options.subagent` carrying the hash + agent_type + description
1175    #[tokio::test(flavor = "multi_thread")]
1176    async fn subagent_file_derives_child_session_with_parent_link() -> anyhow::Result<()> {
1177        let corpus = TempDir::new()?;
1178        let project_dir = corpus.path().join("-tmp-pond-test");
1179        let parent_uuid = "11111111-1111-1111-1111-111111111111";
1180        let agent_hash = "abc123def456";
1181        std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1182
1183        // Parent session file (one user row to anchor a Session).
1184        let parent_row = serde_json::json!({
1185            "type": "user",
1186            "uuid": "u-parent-1",
1187            "sessionId": parent_uuid,
1188            "cwd": "/tmp/pond-test",
1189            "timestamp": "2026-05-16T00:00:00.000Z",
1190            "version": "2.1.121",
1191            "message": {"role": "user", "content": "hi parent"},
1192        });
1193        std::fs::write(
1194            project_dir.join(format!("{parent_uuid}.jsonl")),
1195            format!("{parent_row}\n"),
1196        )?;
1197
1198        // Subagent file + sibling meta. Carries the SAME sessionId as the parent
1199        // in row content; the adapter must derive a child id from the path.
1200        let subagent_row = serde_json::json!({
1201            "type": "user",
1202            "uuid": "u-sub-1",
1203            "sessionId": parent_uuid,
1204            "cwd": "/tmp/pond-test",
1205            "isSidechain": true,
1206            "agentId": agent_hash,
1207            "timestamp": "2026-05-16T00:01:00.000Z",
1208            "version": "2.1.121",
1209            "message": {"role": "user", "content": "subagent prompt"},
1210        });
1211        std::fs::write(
1212            project_dir
1213                .join(parent_uuid)
1214                .join("subagents")
1215                .join(format!("agent-{agent_hash}.jsonl")),
1216            format!("{subagent_row}\n"),
1217        )?;
1218        std::fs::write(
1219            project_dir
1220                .join(parent_uuid)
1221                .join("subagents")
1222                .join(format!("agent-{agent_hash}.meta.json")),
1223            r#"{"agentType":"general-purpose","description":"do a thing"}"#,
1224        )?;
1225
1226        let store_dir = TempDir::new()?;
1227        let store = Store::open_local(store_dir.path()).await?;
1228        let adapter = ClaudeCodeAdapter::new(corpus.path());
1229
1230        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1231        assert_eq!(
1232            summary.dropped_sessions, 0,
1233            "subagent file must NOT collide with parent (pre-fix this was the project-immutable rejection)"
1234        );
1235
1236        let parent = store
1237            .get_session(parent_uuid)
1238            .await?
1239            .expect("parent session should ingest as the bare uuid");
1240        assert_eq!(parent.session.source_agent, "claude-code");
1241        assert_eq!(parent.session.parent_session_id, None);
1242
1243        let child_id = format!("{parent_uuid}/agent-{agent_hash}");
1244        let child = store
1245            .get_session(&child_id)
1246            .await?
1247            .expect("subagent session must surface under the derived id");
1248        assert_eq!(
1249            child.session.source_agent, "claude-code/general-purpose",
1250            "agent_type from .meta.json should suffix the source_agent label"
1251        );
1252        assert_eq!(
1253            child.session.parent_session_id.as_deref(),
1254            Some(parent_uuid),
1255            "subagent must link back to parent via parent_session_id",
1256        );
1257        let subagent_meta = child
1258            .session
1259            .options
1260            .get("subagent")
1261            .expect("options.subagent must carry the hash + verbatim meta.json");
1262        assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1263        assert_eq!(
1264            subagent_meta["meta"]["agentType"],
1265            serde_json::json!("general-purpose")
1266        );
1267        assert_eq!(
1268            subagent_meta["meta"]["description"],
1269            serde_json::json!("do a thing")
1270        );
1271        Ok(())
1272    }
1273
1274    /// Subagent file present but the sibling `.meta.json` is missing. The
1275    /// adapter must still derive a child session (so it doesn't collide with
1276    /// the parent) and fall back to `source_agent = "claude-code/subagent"`.
1277    #[tokio::test(flavor = "multi_thread")]
1278    async fn subagent_without_meta_falls_back_to_generic_label() -> anyhow::Result<()> {
1279        let corpus = TempDir::new()?;
1280        let project_dir = corpus.path().join("-tmp-pond-test");
1281        let parent_uuid = "22222222-2222-2222-2222-222222222222";
1282        let agent_hash = "deadbeef";
1283        std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1284        let row = serde_json::json!({
1285            "type": "user",
1286            "uuid": "u-sub-only",
1287            "sessionId": parent_uuid,
1288            "cwd": "/tmp/pond-test",
1289            "timestamp": "2026-05-16T00:00:00.000Z",
1290            "message": {"role": "user", "content": "no meta sibling here"},
1291        });
1292        std::fs::write(
1293            project_dir
1294                .join(parent_uuid)
1295                .join("subagents")
1296                .join(format!("agent-{agent_hash}.jsonl")),
1297            format!("{row}\n"),
1298        )?;
1299
1300        let store_dir = TempDir::new()?;
1301        let store = Store::open_local(store_dir.path()).await?;
1302        let adapter = ClaudeCodeAdapter::new(corpus.path());
1303        let _summary =
1304            ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1305
1306        let child = store
1307            .get_session(&format!("{parent_uuid}/agent-{agent_hash}"))
1308            .await?
1309            .expect("derived child id even without meta");
1310        assert_eq!(child.session.source_agent, "claude-code/subagent");
1311        Ok(())
1312    }
1313
1314    /// Nested workflow-runner subagent:
1315    ///   `<parent_uuid>/subagents/workflows/<wf-id>/agent-<hash>.jsonl`.
1316    /// Same parent `sessionId` in row content AND a shifted `cwd`. The adapter
1317    /// must derive a distinct child id from the FULL path under `subagents/`
1318    /// (not collapse onto the parent), so it neither collides on the immutable
1319    /// `project` nor silently merges into the parent. Regression for the
1320    /// workflow-layout sync rejection. See spec.md#datasets.
1321    #[tokio::test(flavor = "multi_thread")]
1322    async fn workflow_nested_subagent_derives_distinct_child_not_parent_collision()
1323    -> anyhow::Result<()> {
1324        let corpus = TempDir::new()?;
1325        let project_dir = corpus.path().join("-tmp-pond-test");
1326        let parent_uuid = "44444444-4444-4444-4444-444444444444";
1327        let wf_id = "wf_abcd1234-ef0";
1328        let agent_hash = "cafef00dbaadf00d1";
1329        let wf_dir = project_dir
1330            .join(parent_uuid)
1331            .join("subagents")
1332            .join("workflows")
1333            .join(wf_id);
1334        std::fs::create_dir_all(&wf_dir)?;
1335
1336        let parent_row = serde_json::json!({
1337            "type": "user",
1338            "uuid": "u-parent-1",
1339            "sessionId": parent_uuid,
1340            "cwd": "/tmp/pond-test",
1341            "timestamp": "2026-05-20T00:00:00.000Z",
1342            "message": {"role": "user", "content": "hi parent"},
1343        });
1344        std::fs::write(
1345            project_dir.join(format!("{parent_uuid}.jsonl")),
1346            format!("{parent_row}\n"),
1347        )?;
1348
1349        // Shifted cwd: pre-fix this collided with the parent's immutable project.
1350        let subagent_row = serde_json::json!({
1351            "type": "user",
1352            "uuid": "u-wf-sub-1",
1353            "sessionId": parent_uuid,
1354            "cwd": "/tmp/pond-test/packages/sub",
1355            "isSidechain": true,
1356            "agentId": agent_hash,
1357            "timestamp": "2026-05-20T00:01:00.000Z",
1358            "message": {"role": "user", "content": "workflow subagent prompt"},
1359        });
1360        std::fs::write(
1361            wf_dir.join(format!("agent-{agent_hash}.jsonl")),
1362            format!("{subagent_row}\n"),
1363        )?;
1364        std::fs::write(
1365            wf_dir.join(format!("agent-{agent_hash}.meta.json")),
1366            r#"{"agentType":"general-purpose","description":"workflow child"}"#,
1367        )?;
1368
1369        let store_dir = TempDir::new()?;
1370        let store = Store::open_local(store_dir.path()).await?;
1371        let adapter = ClaudeCodeAdapter::new(corpus.path());
1372        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1373        assert_eq!(
1374            summary.dropped_sessions, 0,
1375            "nested workflow subagent must NOT collide with the parent project",
1376        );
1377
1378        let parent = store
1379            .get_session(parent_uuid)
1380            .await?
1381            .expect("parent session ingests under the bare uuid");
1382        assert_eq!(&*parent.session.project, "/tmp/pond-test");
1383        assert_eq!(parent.session.parent_session_id, None);
1384
1385        let child_id = format!("{parent_uuid}/workflows/{wf_id}/agent-{agent_hash}");
1386        let child = store
1387            .get_session(&child_id)
1388            .await?
1389            .expect("workflow subagent surfaces under the full nested child id");
1390        assert_eq!(child.session.source_agent, "claude-code/general-purpose");
1391        assert_eq!(
1392            child.session.parent_session_id.as_deref(),
1393            Some(parent_uuid)
1394        );
1395        assert_eq!(
1396            &*child.session.project, "/tmp/pond-test/packages/sub",
1397            "child keeps its own cwd-derived project, distinct from the parent",
1398        );
1399        let subagent_meta = child
1400            .session
1401            .options
1402            .get("subagent")
1403            .expect("options.subagent present");
1404        assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1405        Ok(())
1406    }
1407
1408    /// A `.jsonl` under `subagents/` whose leaf is NOT `agent-<hash>.jsonl` (a
1409    /// layout this pond version doesn't understand) must FAIL VISIBLY rather
1410    /// than fall back to its content `sessionId` (the parent's) and silently
1411    /// merge into the parent session. It is counted as an unsupported skip and
1412    /// contributes no rows. See spec.md#datasets.
1413    #[tokio::test(flavor = "multi_thread")]
1414    async fn unrecognized_subagents_file_fails_visibly_not_merged() -> anyhow::Result<()> {
1415        let corpus = TempDir::new()?;
1416        let project_dir = corpus.path().join("-tmp-pond-test");
1417        let parent_uuid = "55555555-5555-5555-5555-555555555555";
1418        let unknown_dir = project_dir
1419            .join(parent_uuid)
1420            .join("subagents")
1421            .join("workflows")
1422            .join("wf_future01-aaa");
1423        std::fs::create_dir_all(&unknown_dir)?;
1424
1425        let parent_row = serde_json::json!({
1426            "type": "user",
1427            "uuid": "u-parent-only",
1428            "sessionId": parent_uuid,
1429            "cwd": "/tmp/pond-test",
1430            "timestamp": "2026-05-20T00:00:00.000Z",
1431            "message": {"role": "user", "content": "parent message"},
1432        });
1433        std::fs::write(
1434            project_dir.join(format!("{parent_uuid}.jsonl")),
1435            format!("{parent_row}\n"),
1436        )?;
1437
1438        // Same parent sessionId AND same cwd: pre-guard this would have merged
1439        // silently into the parent. The leaf name is not `agent-<hash>.jsonl`.
1440        let unknown_row = serde_json::json!({
1441            "type": "user",
1442            "uuid": "u-should-not-merge",
1443            "sessionId": parent_uuid,
1444            "cwd": "/tmp/pond-test",
1445            "timestamp": "2026-05-20T00:02:00.000Z",
1446            "message": {"role": "user", "content": "must not land under parent"},
1447        });
1448        std::fs::write(
1449            unknown_dir.join("transcript-001.jsonl"),
1450            format!("{unknown_row}\n"),
1451        )?;
1452
1453        let store_dir = TempDir::new()?;
1454        let store = Store::open_local(store_dir.path()).await?;
1455        let adapter = ClaudeCodeAdapter::new(corpus.path());
1456        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1457
1458        assert_eq!(
1459            summary.skipped_files, 1,
1460            "the unrecognized subagents/ transcript must be a visible, counted skip",
1461        );
1462        let parent = store
1463            .get_session(parent_uuid)
1464            .await?
1465            .expect("parent session ingests");
1466        assert_eq!(
1467            parent.messages.len(),
1468            1,
1469            "the unrecognized file's row must NOT be merged into the parent session",
1470        );
1471        assert!(
1472            parent
1473                .messages
1474                .iter()
1475                .all(|m| m.message.id() != "u-should-not-merge"),
1476            "parent must not absorb the unrecognized file's message",
1477        );
1478        Ok(())
1479    }
1480
1481    /// Re-sync visibility: an unrecognized `subagents/` file must STILL surface as
1482    /// a visible `Unsupported` skip when the parent already carries a freshness
1483    /// watermark. Its content `sessionId` is the parent's, so peeking it would let
1484    /// the freshness gate skip the file as `Fresh` under the parent's watermark and
1485    /// hide the failure. `peek_session_id` returns `None` for it instead, keeping
1486    /// it out of the gate. Regression for the re-sync visibility leak. See
1487    /// spec.md#datasets.
1488    #[tokio::test(flavor = "multi_thread")]
1489    async fn unrecognized_subagents_file_stays_visible_under_parent_watermark() -> anyhow::Result<()>
1490    {
1491        struct ParentAlreadyFresh;
1492        impl crate::adapter::SkipOracle for ParentAlreadyFresh {
1493            fn last_ingested_at(&self, _session_id: &str) -> Option<DateTime<Utc>> {
1494                // Far-future watermark: every source mtime is `<= ingested`, so a
1495                // peeked id WOULD trip the freshness gate. The guard must keep the
1496                // unrecognized file out of the gate regardless.
1497                Some(
1498                    DateTime::parse_from_rfc3339("2999-01-01T00:00:00Z")
1499                        .unwrap()
1500                        .with_timezone(&Utc),
1501                )
1502            }
1503            fn is_empty(&self) -> bool {
1504                false
1505            }
1506        }
1507
1508        let corpus = TempDir::new()?;
1509        let project_dir = corpus.path().join("-tmp-pond-test");
1510        let parent_uuid = "66666666-6666-6666-6666-666666666666";
1511        let unknown_dir = project_dir
1512            .join(parent_uuid)
1513            .join("subagents")
1514            .join("workflows")
1515            .join("wf_future02-bbb");
1516        std::fs::create_dir_all(&unknown_dir)?;
1517
1518        let parent_row = serde_json::json!({
1519            "type": "user",
1520            "uuid": "u-parent-fresh",
1521            "sessionId": parent_uuid,
1522            "cwd": "/tmp/pond-test",
1523            "timestamp": "2026-05-20T00:00:00.000Z",
1524            "message": {"role": "user", "content": "parent message"},
1525        });
1526        std::fs::write(
1527            project_dir.join(format!("{parent_uuid}.jsonl")),
1528            format!("{parent_row}\n"),
1529        )?;
1530
1531        // Same parent sessionId, leaf not `agent-<hash>.jsonl`: pre-fix this would
1532        // peek the parent's id and be fresh-skipped under the far-future watermark.
1533        let unknown_row = serde_json::json!({
1534            "type": "user",
1535            "uuid": "u-resync-should-stay-visible",
1536            "sessionId": parent_uuid,
1537            "cwd": "/tmp/pond-test",
1538            "timestamp": "2026-05-20T00:02:00.000Z",
1539            "message": {"role": "user", "content": "must stay visible"},
1540        });
1541        std::fs::write(
1542            unknown_dir.join("transcript-002.jsonl"),
1543            format!("{unknown_row}\n"),
1544        )?;
1545
1546        let store_dir = TempDir::new()?;
1547        let store = Store::open_local(store_dir.path()).await?;
1548        let adapter = ClaudeCodeAdapter::new(corpus.path());
1549        let summary = ingest_adapter(&store, &adapter, &ParentAlreadyFresh, |_| {}).await?;
1550
1551        assert_eq!(
1552            summary.skipped_files, 1,
1553            "the unrecognized transcript must stay a visible Unsupported skip, not be fresh-skipped under the parent's watermark",
1554        );
1555        // The parent file legitimately fresh-skips under the far-future watermark;
1556        // the unrecognized file must NOT join it (pre-fix `skipped_fresh` would be 2).
1557        assert_eq!(
1558            summary.skipped_fresh, 1,
1559            "only the parent may fresh-skip; the unrecognized file must not borrow its watermark",
1560        );
1561        Ok(())
1562    }
1563
1564    /// Three rows with the same `uuid` (the claude-code `/resume` replay
1565    /// pattern). The adapter must dedupe at the file-state level so the
1566    /// validator never sees the duplicates; `dropped_events` stays 0 and
1567    /// `inserted` covers the single canonical row.
1568    #[tokio::test(flavor = "multi_thread")]
1569    async fn replay_duplicates_are_dedup_at_adapter_layer() -> anyhow::Result<()> {
1570        let corpus = TempDir::new()?;
1571        let project_dir = corpus.path().join("-tmp-pond-test");
1572        std::fs::create_dir_all(&project_dir)?;
1573        let session_uuid = "33333333-3333-3333-3333-333333333333";
1574        let dup_uuid = "u-shared-1";
1575        let row = serde_json::json!({
1576            "type": "user",
1577            "uuid": dup_uuid,
1578            "sessionId": session_uuid,
1579            "cwd": "/tmp/pond-test",
1580            "timestamp": "2026-05-16T00:00:00.000Z",
1581            "message": {"role": "user", "content": "replayed three times"},
1582        });
1583        // Three identical rows back-to-back, same uuid.
1584        let body = format!("{row}\n{row}\n{row}\n");
1585        std::fs::write(project_dir.join(format!("{session_uuid}.jsonl")), body)?;
1586
1587        let store_dir = TempDir::new()?;
1588        let store = Store::open_local(store_dir.path()).await?;
1589        let adapter = ClaudeCodeAdapter::new(corpus.path());
1590        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1591
1592        assert_eq!(
1593            summary.dropped_events, 0,
1594            "adapter must dedupe replays before they reach the validator"
1595        );
1596        assert!(
1597            !summary
1598                .drop_reasons
1599                .contains_key(crate::sessions::DROP_REASON_DUPLICATE_MESSAGE_ID),
1600            "duplicate_message_id bucket stays empty when adapter does its job"
1601        );
1602        Ok(())
1603    }
1604
1605    /// One assistant `tool_use` followed by a user `tool_result` in the same
1606    /// file. The adapter's per-file `tool_use_id -> name` map must resolve the
1607    /// result's tool name to the call's name. Pre-fix: synthesized `"unknown"`.
1608    #[tokio::test(flavor = "multi_thread")]
1609    async fn tool_result_name_resolves_from_prior_tool_use_in_same_file() -> anyhow::Result<()> {
1610        let corpus = TempDir::new()?;
1611        let project_dir = corpus.path().join("-tmp-pond-test");
1612        std::fs::create_dir_all(&project_dir)?;
1613        let session_uuid = "44444444-4444-4444-4444-444444444444";
1614        let call_id = "toolu_test_01";
1615
1616        let tool_use_row = serde_json::json!({
1617            "type": "assistant",
1618            "uuid": "u-call",
1619            "sessionId": session_uuid,
1620            "cwd": "/tmp/pond-test",
1621            "timestamp": "2026-05-16T00:00:00.000Z",
1622            "message": {
1623                "role": "assistant",
1624                "content": [{
1625                    "type": "tool_use",
1626                    "id": call_id,
1627                    "name": "Edit",
1628                    "input": {"file_path": "/tmp/foo"},
1629                }],
1630            },
1631        });
1632        let tool_result_row = serde_json::json!({
1633            "type": "user",
1634            "uuid": "u-result",
1635            "sessionId": session_uuid,
1636            "cwd": "/tmp/pond-test",
1637            "timestamp": "2026-05-16T00:00:01.000Z",
1638            "message": {
1639                "role": "user",
1640                "content": [{
1641                    "type": "tool_result",
1642                    "tool_use_id": call_id,
1643                    "content": "ok",
1644                }],
1645            },
1646        });
1647        std::fs::write(
1648            project_dir.join(format!("{session_uuid}.jsonl")),
1649            format!("{tool_use_row}\n{tool_result_row}\n"),
1650        )?;
1651
1652        let store_dir = TempDir::new()?;
1653        let store = Store::open_local(store_dir.path()).await?;
1654        let adapter = ClaudeCodeAdapter::new(corpus.path());
1655        let _summary =
1656            ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1657        let session = store
1658            .get_session(session_uuid)
1659            .await?
1660            .expect("session ingests");
1661
1662        let mut saw_call = false;
1663        let mut saw_result = false;
1664        for stored in &session.messages {
1665            for part in &stored.parts {
1666                match &part.kind {
1667                    PartKind::ToolCall {
1668                        call_id: cid, name, ..
1669                    } => {
1670                        assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1671                        assert_eq!(
1672                            name.as_ref().map(|e| e.as_str()),
1673                            Some("Edit"),
1674                            "tool_use carries the name directly"
1675                        );
1676                        saw_call = true;
1677                    }
1678                    PartKind::ToolResult {
1679                        call_id: cid, name, ..
1680                    } => {
1681                        assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1682                        assert_eq!(
1683                            name.as_ref().map(|e| e.as_str()),
1684                            Some("Edit"),
1685                            "tool_result resolves the name via the per-file map (was 'unknown' pre-2026-05-16)"
1686                        );
1687                        saw_result = true;
1688                    }
1689                    _ => {}
1690                }
1691            }
1692        }
1693        assert!(saw_call && saw_result, "both parts must be present");
1694        Ok(())
1695    }
1696
1697    /// spec.md#model-part-provenance: a genuine human prompt classifies
1698    /// `conversational`; a harness `<task-notification>` user-slot turn and an
1699    /// `isMeta` row classify `injected`.
1700    #[test]
1701    fn user_text_provenance_separates_prompts_from_harness_injection() {
1702        let prompt = json!({"type": "user", "uuid": "u1"});
1703        assert_eq!(
1704            user_text_provenance(&prompt, "please refactor the parser"),
1705            Provenance::Conversational,
1706        );
1707
1708        let notification = json!({"type": "user", "uuid": "u2"});
1709        assert_eq!(
1710            user_text_provenance(
1711                &notification,
1712                "<task-notification>background task done</task-notification>",
1713            ),
1714            Provenance::Injected,
1715        );
1716
1717        let meta = json!({"type": "user", "uuid": "u3", "isMeta": true});
1718        assert_eq!(
1719            user_text_provenance(&meta, "expanded skill body"),
1720            Provenance::Injected,
1721        );
1722    }
1723
1724    /// Ingest a session carrying a `<task-notification>` user message and a
1725    /// genuine prompt; the notification's part must be `injected` and the
1726    /// prompt's `conversational` (spec.md#model-part-provenance).
1727    #[tokio::test(flavor = "multi_thread")]
1728    async fn task_notification_message_yields_injected_parts() -> anyhow::Result<()> {
1729        let corpus = TempDir::new()?;
1730        let project_dir = corpus.path().join("-tmp-pond-test");
1731        std::fs::create_dir_all(&project_dir)?;
1732        let session_uuid = "66666666-6666-6666-6666-666666666666";
1733        let prompt = serde_json::json!({
1734            "type": "user",
1735            "uuid": "u-prompt",
1736            "sessionId": session_uuid,
1737            "cwd": "/tmp/pond-test",
1738            "timestamp": "2026-05-16T00:00:00.000Z",
1739            "message": {"role": "user", "content": "genuine human prompt"},
1740        });
1741        let notification = serde_json::json!({
1742            "type": "user",
1743            "uuid": "u-notify",
1744            "sessionId": session_uuid,
1745            "cwd": "/tmp/pond-test",
1746            "timestamp": "2026-05-16T00:00:01.000Z",
1747            "message": {
1748                "role": "user",
1749                "content": "<task-notification>a background task finished</task-notification>",
1750            },
1751        });
1752        std::fs::write(
1753            project_dir.join(format!("{session_uuid}.jsonl")),
1754            format!("{prompt}\n{notification}\n"),
1755        )?;
1756
1757        let store_dir = TempDir::new()?;
1758        let store = Store::open_local(store_dir.path()).await?;
1759        let adapter = ClaudeCodeAdapter::new(corpus.path());
1760        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1761
1762        let session = store
1763            .get_session(session_uuid)
1764            .await?
1765            .expect("session ingests");
1766        let mut saw_prompt = false;
1767        let mut saw_notification = false;
1768        for stored in &session.messages {
1769            for part in &stored.parts {
1770                if stored.message.id() == "u-prompt" {
1771                    assert_eq!(part.provenance, crate::wire::Provenance::Conversational);
1772                    saw_prompt = true;
1773                }
1774                if stored.message.id() == "u-notify" {
1775                    assert_eq!(part.provenance, crate::wire::Provenance::Injected);
1776                    saw_notification = true;
1777                }
1778            }
1779        }
1780        assert!(saw_prompt && saw_notification, "both messages present");
1781        Ok(())
1782    }
1783
1784    /// Orphan tool_result with no earlier tool_use in the same file: the
1785    /// per-file map can't resolve. The adapter must emit `name: None`, NOT
1786    /// the old `"unknown"` sentinel. Invariant 15 (no synthesized values).
1787    #[tokio::test(flavor = "multi_thread")]
1788    async fn orphan_tool_result_yields_name_none_not_unknown_sentinel() -> anyhow::Result<()> {
1789        let corpus = TempDir::new()?;
1790        let project_dir = corpus.path().join("-tmp-pond-test");
1791        std::fs::create_dir_all(&project_dir)?;
1792        let session_uuid = "55555555-5555-5555-5555-555555555555";
1793
1794        // tool_result with no earlier tool_use (simulates a compaction-pruned call).
1795        let row = serde_json::json!({
1796            "type": "user",
1797            "uuid": "u-orphan",
1798            "sessionId": session_uuid,
1799            "cwd": "/tmp/pond-test",
1800            "timestamp": "2026-05-16T00:00:00.000Z",
1801            "message": {
1802                "role": "user",
1803                "content": [{
1804                    "type": "tool_result",
1805                    "tool_use_id": "toolu_orphan",
1806                    "content": "result body, no matching call",
1807                }],
1808            },
1809        });
1810        std::fs::write(
1811            project_dir.join(format!("{session_uuid}.jsonl")),
1812            format!("{row}\n"),
1813        )?;
1814
1815        let store_dir = TempDir::new()?;
1816        let store = Store::open_local(store_dir.path()).await?;
1817        let adapter = ClaudeCodeAdapter::new(corpus.path());
1818        let _summary =
1819            ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1820        let session = store
1821            .get_session(session_uuid)
1822            .await?
1823            .expect("session ingests");
1824        let mut found = false;
1825        for stored in &session.messages {
1826            for part in &stored.parts {
1827                if let PartKind::ToolResult { name, call_id, .. } = &part.kind {
1828                    assert_eq!(call_id.as_ref().map(|e| e.as_str()), Some("toolu_orphan"));
1829                    assert!(
1830                        name.is_none(),
1831                        "orphan tool_result must be name=None, not synthesized 'unknown'",
1832                    );
1833                    found = true;
1834                }
1835            }
1836        }
1837        assert!(found, "orphan tool_result part must be present");
1838        // Sanity: even an orphan should not be reported as a drop.
1839        Ok(())
1840    }
1841}