1use std::{
10 collections::{HashMap, HashSet},
11 path::{Path, PathBuf},
12};
13
14use anyhow::Context as _;
15use chrono::{DateTime, SecondsFormat, Utc};
16use serde_json::{Value, json};
17
18use crate::{
19 sessions::IngestEvent,
20 wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
21};
22
23use super::{
24 Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
25 RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
26 empty_options,
27 extract::{
28 Extracted, Source, extract_compact_repr, extract_raw_record, extract_self_str, extract_str,
29 },
30 extracted_text,
31 jsonl::{BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events, source_line},
32 jsonl_bytes, part_id, part_ordinal, raw_record,
33};
34
35#[derive(Debug, Default)]
56pub(crate) struct FileState {
57 seen_uuids: HashSet<String>,
58 tool_call_names: HashMap<String, Extracted<String>>,
59}
60
61const NAME: &str = "claude-code";
65
66pub struct ClaudeCodeFactory;
69
70impl AdapterFactory for ClaudeCodeFactory {
71 fn name(&self) -> &'static str {
72 NAME
73 }
74
75 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
76 Ok(Box::new(ClaudeCodeAdapter::new(config_path(NAME, config)?)))
77 }
78
79 fn probe_default(&self, env: &Env) -> Option<Value> {
80 let path = env.home.join(".claude").join("projects");
81 path.exists().then(|| json!({ "path": path }))
82 }
83
84 fn serialize(
85 &self,
86 session: &crate::sessions::SessionWithMessages,
87 fidelity: RestoreFidelity,
88 ) -> Result<Vec<RestoredFile>, AdapterError> {
89 serialize_session(session, fidelity)
90 }
91}
92
93fn serialize_session(
94 session: &crate::sessions::SessionWithMessages,
95 fidelity: RestoreFidelity,
96) -> Result<Vec<RestoredFile>, AdapterError> {
97 let mut messages = session.messages.clone();
98 if fidelity == RestoreFidelity::Native {
99 messages.sort_by(|left, right| {
100 source_line(left.message.options())
101 .cmp(&source_line(right.message.options()))
102 .then_with(|| by_timestamp_then_id(left, right))
103 });
104 } else {
105 messages.sort_by(by_timestamp_then_id);
106 }
107 let mut records = Vec::with_capacity(messages.len());
111 let mut parent_uuid = None::<String>;
112 for message in &messages {
113 if fidelity == RestoreFidelity::Native
114 && let Some(raw) = raw_record(message.message.options())
115 {
116 parent_uuid = raw
117 .get("uuid")
118 .and_then(Value::as_str)
119 .map(ToOwned::to_owned)
120 .or(parent_uuid);
121 records.push(raw);
122 continue;
123 }
124 let Some(record) = claude_record(session, message, parent_uuid.as_deref()) else {
127 continue;
128 };
129 parent_uuid = record
130 .get("uuid")
131 .and_then(Value::as_str)
132 .map(ToOwned::to_owned);
133 records.push(record);
134 }
135
136 let mut files = vec![RestoredFile::new(
137 claude_relative_path(session),
138 jsonl_bytes(NAME, &records)?,
139 fidelity,
140 )];
141 if session.session.parent_session_id.is_some()
142 && let Some(meta) = subagent_meta_record(session)
143 {
144 let mut meta_path = files[0].relative_path.clone();
145 meta_path.set_extension("meta.json");
146 files.push(RestoredFile::new(
147 meta_path,
148 serde_json::to_vec(&meta).map_err(|err| {
149 AdapterError::schema(
150 NAME,
151 &session.session.id,
152 format!("json encode failed: {err}"),
153 )
154 })?,
155 fidelity,
156 ));
157 }
158 Ok(files)
159}
160
161fn claude_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
162 let encoded_project = session
163 .session
164 .options
165 .get("source")
166 .and_then(|source| source.get("project_dir"))
167 .and_then(Value::as_str)
168 .map(ToOwned::to_owned)
169 .unwrap_or_else(|| encode_project(&session.session.project));
170 if let Some(parent) = &session.session.parent_session_id {
171 let child_suffix = session
176 .session
177 .id
178 .strip_prefix(&format!("{parent}/"))
179 .unwrap_or(&session.session.id);
180 return PathBuf::from(encoded_project)
181 .join(parent)
182 .join("subagents")
183 .join(format!("{child_suffix}.jsonl"));
184 }
185 PathBuf::from(encoded_project).join(format!("{}.jsonl", session.session.id))
186}
187
188fn encode_project(project: &str) -> String {
189 project.replace(['/', '.'], "-")
190}
191
192fn subagent_meta_record(session: &crate::sessions::SessionWithMessages) -> Option<Value> {
193 let meta = session.session.options.get("subagent")?.get("meta")?;
197 meta.is_object().then(|| meta.clone())
198}
199
200fn claude_record(
201 session: &crate::sessions::SessionWithMessages,
202 message: &crate::sessions::MessageWithParts,
203 parent_uuid: Option<&str>,
204) -> Option<Value> {
205 let row_role = match &message.message {
213 Message::System { .. } => return None,
214 Message::User { .. } | Message::Tool { .. } => "user",
215 Message::Assistant { .. } => "assistant",
216 };
217 let mut envelope = serde_json::Map::new();
218 envelope.insert("role".to_owned(), Value::String(row_role.to_owned()));
219 if row_role == "assistant" {
220 envelope.insert("type".to_owned(), Value::String("message".to_owned()));
223 }
224 envelope.insert(
225 "content".to_owned(),
226 Value::Array(message.parts.iter().map(claude_part).collect()),
227 );
228 Some(json!({
229 "parentUuid": parent_uuid,
230 "isSidechain": false,
231 "userType": "external",
232 "cwd": &*session.session.project,
233 "sessionId": &session.session.id,
234 "type": row_role,
235 "message": Value::Object(envelope),
236 "uuid": message.message.id(),
237 "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
238 }))
239}
240
241fn claude_part(part: &Part) -> Value {
242 match &part.kind {
243 PartKind::Text { text } => json!({"type": "text", "text": extracted_text(text)}),
244 PartKind::Reasoning { text } => {
245 json!({"type": "thinking", "thinking": extracted_text(text)})
246 }
247 PartKind::ToolCall {
248 call_id,
249 name,
250 params,
251 provider_executed,
252 } => json!({
253 "type": if *provider_executed { "server_tool_use" } else { "tool_use" },
254 "id": extracted_text(call_id),
255 "name": extracted_text(name),
256 "input": params,
257 }),
258 PartKind::ToolResult {
259 call_id,
260 is_failure,
261 result,
262 ..
263 } => json!({
264 "type": "tool_result",
265 "tool_use_id": extracted_text(call_id),
266 "is_error": is_failure,
267 "content": result,
268 }),
269 PartKind::File {
270 media_type,
271 file_name,
272 data,
273 } => json!({
274 "type": "file",
275 "media_type": media_type,
276 "file_name": file_name,
277 "source": file_source(data),
278 }),
279 other => {
280 json!({"type": "text", "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null))})
281 }
282 }
283}
284
285fn file_source(data: &FileData) -> Value {
286 match data {
287 FileData::String(value) => json!({"type": "text", "data": value}),
288 FileData::Bytes(value) => json!({"type": "base64", "data": value}),
289 FileData::Url(value) => json!({"type": "url", "url": value}),
290 }
291}
292
293#[derive(Debug, Clone)]
296pub struct ClaudeCodeAdapter {
297 root: PathBuf,
298}
299
300impl ClaudeCodeAdapter {
301 pub fn new(root: impl Into<PathBuf>) -> Self {
302 Self { root: root.into() }
303 }
304}
305
306impl Adapter for ClaudeCodeAdapter {
307 fn discover(&self) -> DiscoverFuture<'_> {
308 jsonl_tree_discover(self)
309 }
310
311 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
312 jsonl_tree_events(self, oracle)
313 }
314}
315
316impl JsonlTree for ClaudeCodeAdapter {
317 type State = FileState;
318
319 fn name(&self) -> &'static str {
320 NAME
321 }
322
323 fn root(&self) -> &Path {
324 &self.root
325 }
326
327 fn peek_session_id(&self, path: &Path, first_line: &str) -> Option<String> {
328 if subagents_dir(path).is_some() {
335 let (parent_uuid, child_suffix, _) = subagent_ids(path)?;
336 return Some(format!("{parent_uuid}/{child_suffix}"));
337 }
338 let row: Value = serde_json::from_str(first_line).ok()?;
339 row.get("sessionId")?.as_str().map(ToOwned::to_owned)
340 }
341
342 fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
343 session_from_rows(path, rows)
344 }
345
346 fn events_from_row(
347 &self,
348 session: &Session,
349 row: &BoundedRow,
350 state: &mut Self::State,
351 ) -> Result<Vec<IngestEvent>, String> {
352 if let Some(uuid) = row.value.get("uuid").and_then(Value::as_str)
353 && !state.seen_uuids.insert(uuid.to_owned())
354 {
355 return Ok(Vec::new());
356 }
357 capture_tool_call_names(&row.value, &mut state.tool_call_names);
358 events_from_row(&session.id, row.line, &row.value, session.created_at, state)
359 }
360
361 fn unsupported_reason(&self, path: &Path) -> Option<String> {
362 if subagents_dir(path).is_some()
371 && subagent_ids(path).is_none()
372 && !is_workflow_control_file(path)
373 {
374 return Some(format!(
375 "{}: subagent transcript layout not recognized by this pond version; \
376 skipped so it is not merged into the parent session - update pond and \
377 re-run `pond sync`",
378 path.display()
379 ));
380 }
381 None
382 }
383}
384
385fn is_workflow_control_file(path: &Path) -> bool {
391 subagents_dir(path).is_some()
392 && path.file_name().and_then(|n| n.to_str()) == Some("journal.jsonl")
393}
394
395fn capture_tool_call_names(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
400 let Some(items) = row
401 .get("message")
402 .and_then(|message| message.get("content"))
403 .and_then(Value::as_array)
404 else {
405 return;
406 };
407 for item in items {
408 let kind = item.get("type").and_then(Value::as_str);
409 if !matches!(kind, Some("tool_use") | Some("server_tool_use")) {
410 continue;
411 }
412 let (Some(id), Some(name)) = (item.str_field("id"), extract_str(item, "name")) else {
413 continue;
414 };
415 map.insert(id.to_owned(), name);
416 }
417}
418
419fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
420 let path_display = path.display().to_string();
421 if subagents_dir(path).is_some() && subagent_ids(path).is_none() {
426 return Err(AdapterError::schema(
427 NAME,
428 path_display,
429 "sidecar/control file under subagents/ has no session of its own",
430 ));
431 }
432 let mut created_at = None;
433 let mut project: Option<Extracted<String>> = None;
434 let mut version = None;
435 for row in rows {
436 if created_at.is_none() {
437 created_at = parse_timestamp(&row.value).ok();
438 }
439 if project.is_none() {
440 project = extract_str(&row.value, "cwd");
441 }
442 if version.is_none() {
443 version = row
444 .value
445 .get("version")
446 .and_then(Value::as_str)
447 .map(ToOwned::to_owned);
448 }
449 }
450
451 let first = rows
452 .first()
453 .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
454 let at_first = format!("{path_display}:{}", first.line);
455 let raw_session_id = first
456 .value
457 .get("sessionId")
458 .and_then(Value::as_str)
459 .ok_or_else(|| {
460 AdapterError::schema(
461 NAME,
462 at_first.clone(),
463 format!("line {} missing sessionId", first.line),
464 )
465 })?
466 .to_owned();
467 let created_at = created_at.ok_or_else(|| {
468 AdapterError::schema(NAME, at_first, "session has no parseable timestamp")
469 })?;
470
471 let subagent = subagent_descriptor(path);
484 let project_dir = source_project_dir(path, subagent.is_some());
485 let (session_id, parent_session_id, source_agent, subagent_options) = match subagent {
486 Some(SubagentDescriptor {
487 parent_uuid,
488 child_suffix,
489 agent_hash,
490 agent_type,
491 meta,
492 }) => {
493 let child_id = format!("{parent_uuid}/{child_suffix}");
494 let agent_label = agent_type
495 .as_deref()
496 .map(|t| format!("claude-code/{t}"))
497 .unwrap_or_else(|| "claude-code/subagent".to_owned());
498 let metadata = json!({
502 "hash": agent_hash,
503 "raw_session_id": raw_session_id,
504 "meta": meta,
505 });
506 (child_id, Some(parent_uuid), agent_label, Some(metadata))
507 }
508 None => (raw_session_id, None, "claude-code".to_owned(), None),
509 };
510
511 let project = match project {
512 Some(value) => value,
513 None => {
514 let decoded = path
515 .parent()
516 .and_then(|p| p.file_name())
517 .and_then(|n| n.to_str())
518 .map(|s| s.replace('-', "/"))
519 .ok_or_else(|| {
520 AdapterError::schema(
521 NAME,
522 path_display.clone(),
523 "no `cwd` field in any row and source path is not UTF-8",
524 )
525 })?;
526 extract_self_str(&Value::String(decoded)).ok_or_else(|| {
527 AdapterError::schema(
528 NAME,
529 path_display.clone(),
530 "internal: Value::String produced None from Source::as_str",
531 )
532 })?
533 }
534 };
535
536 let mut options = ProviderOptions::new();
537 options.insert(
538 "source".to_owned(),
539 json!({
540 "adapter": "claude-code",
541 "version": version,
542 "project_dir": project_dir,
543 "workspace_path": &*project,
544 }),
545 );
546 if let Some(metadata) = subagent_options {
547 options.insert("subagent".to_owned(), metadata);
548 }
549
550 Ok(Session {
551 id: session_id,
552 parent_session_id,
553 parent_message_id: None,
554 source_agent,
555 created_at,
556 project,
557 options,
558 })
559}
560
561fn source_project_dir(path: &Path, is_subagent: bool) -> Option<String> {
562 let project_dir = if is_subagent {
567 subagents_dir(path)?.parent()?.parent()
568 } else {
569 path.parent()
570 };
571 project_dir
572 .and_then(|p| p.file_name())
573 .and_then(|n| n.to_str())
574 .map(ToOwned::to_owned)
575}
576
577fn subagents_dir(path: &Path) -> Option<&Path> {
582 let mut cur = path.parent();
583 while let Some(dir) = cur {
584 if dir.file_name().and_then(|n| n.to_str()) == Some("subagents") {
585 return Some(dir);
586 }
587 cur = dir.parent();
588 }
589 None
590}
591
592struct SubagentDescriptor {
598 parent_uuid: String,
599 child_suffix: String,
600 agent_hash: String,
601 agent_type: Option<String>,
602 meta: Option<Value>,
603}
604
605fn subagent_ids(path: &Path) -> Option<(String, String, String)> {
613 let file_name = path.file_name()?.to_str()?;
614 let agent_hash = file_name
615 .strip_prefix("agent-")?
616 .strip_suffix(".jsonl")?
617 .to_owned();
618 let subagents = subagents_dir(path)?;
619 let parent_uuid = subagents.parent()?.file_name()?.to_str()?.to_owned();
620 let child_suffix = path
624 .strip_prefix(subagents)
625 .ok()?
626 .with_extension("")
627 .to_str()?
628 .replace(std::path::MAIN_SEPARATOR, "/");
629 Some((parent_uuid, child_suffix, agent_hash))
630}
631
632fn subagent_descriptor(path: &Path) -> Option<SubagentDescriptor> {
635 let (parent_uuid, child_suffix, agent_hash) = subagent_ids(path)?;
636 let meta_path = path.parent()?.join(format!("agent-{agent_hash}.meta.json"));
637 let (agent_type, meta) = match std::fs::read(&meta_path) {
638 Ok(bytes) => match serde_json::from_slice::<Value>(&bytes) {
639 Ok(value) => (
640 value
641 .get("agentType")
642 .and_then(Value::as_str)
643 .map(ToOwned::to_owned),
644 Some(value),
645 ),
646 Err(error) => {
647 tracing::debug!(
648 target: "pond::adapter::claude_code",
649 meta = %meta_path.display(),
650 %error,
651 "subagent .meta.json present but unparseable; falling back to 'claude-code/subagent'",
652 );
653 (None, None)
654 }
655 },
656 Err(error) if error.kind() == std::io::ErrorKind::NotFound => (None, None),
657 Err(error) => {
658 tracing::debug!(
659 target: "pond::adapter::claude_code",
660 meta = %meta_path.display(),
661 %error,
662 "subagent .meta.json IO error; falling back to 'claude-code/subagent'",
663 );
664 (None, None)
665 }
666 };
667
668 Some(SubagentDescriptor {
669 parent_uuid,
670 child_suffix,
671 agent_hash,
672 agent_type,
673 meta,
674 })
675}
676
677fn events_from_row(
678 session_id: &str,
679 line: usize,
680 row: &Value,
681 default_timestamp: DateTime<Utc>,
682 state: &FileState,
683) -> Result<Vec<IngestEvent>, String> {
684 let timestamp = parse_timestamp(row).unwrap_or(default_timestamp);
685 let uuid = row
686 .get("uuid")
687 .and_then(Value::as_str)
688 .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
689
690 if let Some(message_value) = row.get("message") {
691 return message_events(
692 session_id,
693 &uuid,
694 timestamp,
695 row,
696 message_value,
697 state,
698 line,
699 );
700 }
701
702 let raw_type = row.get("type").and_then(Value::as_str);
709 let content = if raw_type == Some("attachment") {
710 row.get("attachment")
711 .and_then(attachment_content)
712 .or_else(|| Some(extract_compact_repr(row)))
713 } else {
714 extract_str(row, "subtype").or_else(|| extract_str(row, "type"))
715 };
716 let message = Message::System {
717 id: uuid,
718 session_id: session_id.to_owned(),
719 timestamp,
720 content,
721 options: row_options(row, line),
722 };
723 Ok(vec![IngestEvent::Message(message)])
724}
725
726fn message_events(
727 session_id: &str,
728 uuid: &str,
729 timestamp: DateTime<Utc>,
730 row: &Value,
731 message_value: &Value,
732 state: &FileState,
733 line: usize,
734) -> Result<Vec<IngestEvent>, String> {
735 let role = message_value
736 .get("role")
737 .and_then(Value::as_str)
738 .ok_or_else(|| "message missing role".to_owned())?;
739 let content = message_value.get("content").unwrap_or(&Value::Null);
740 let mut parts = Vec::new();
741 let message = match (role, content) {
742 ("user", Value::String(text)) => {
743 let provenance = user_text_provenance(row, text);
747 parts.push(text_part(
748 session_id,
749 uuid,
750 0,
751 extract_self_str(content),
752 provenance,
753 ));
754 Message::User {
755 id: uuid.to_owned(),
756 session_id: session_id.to_owned(),
757 timestamp,
758 options: row_options(row, line),
759 }
760 }
761 ("user", Value::Array(items)) if items.iter().all(is_tool_result) => {
762 let source_tool_result = row.get("toolUseResult").cloned();
763 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
764 tool_result_part(
765 session_id,
766 uuid,
767 ordinal,
768 item,
769 source_tool_result.as_ref(),
770 state,
771 )
772 }));
773 Message::Tool {
774 id: uuid.to_owned(),
775 session_id: session_id.to_owned(),
776 timestamp,
777 options: row_options(row, line),
778 }
779 }
780 ("user", Value::Array(items)) => {
781 let provenance = user_array_provenance(row, items);
784 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
785 user_part(session_id, uuid, ordinal, item, state, provenance)
786 }));
787 Message::User {
788 id: uuid.to_owned(),
789 session_id: session_id.to_owned(),
790 timestamp,
791 options: row_options(row, line),
792 }
793 }
794 ("assistant", Value::Array(items)) => {
795 parts.extend(
796 items
797 .iter()
798 .enumerate()
799 .map(|(ordinal, item)| assistant_part(session_id, uuid, ordinal, item)),
800 );
801 Message::Assistant {
802 id: uuid.to_owned(),
803 session_id: session_id.to_owned(),
804 timestamp,
805 options: assistant_options(row, message_value, line),
806 }
807 }
808 ("system", Value::String(_)) => Message::System {
809 id: uuid.to_owned(),
810 session_id: session_id.to_owned(),
811 timestamp,
812 content: extract_self_str(content),
813 options: row_options(row, line),
814 },
815 ("system", _) => Message::System {
816 id: uuid.to_owned(),
817 session_id: session_id.to_owned(),
818 timestamp,
819 content: Some(extract_compact_repr(message_value)),
824 options: row_options(row, line),
825 },
826 (other, _) => {
827 return Err(format!("unsupported message role {other}"));
828 }
829 };
830
831 let mut events = Vec::with_capacity(parts.len() + 1);
832 events.push(IngestEvent::Message(message));
833 events.extend(parts.into_iter().map(IngestEvent::Part));
834 Ok(events)
835}
836
837fn text_part(
838 session_id: &str,
839 message_id: &str,
840 ordinal: usize,
841 text: Option<Extracted<String>>,
842 provenance: Provenance,
843) -> Part {
844 Part {
845 session_id: session_id.to_owned(),
846 id: part_id(message_id, ordinal),
847 message_id: message_id.to_owned(),
848 ordinal: part_ordinal(ordinal),
849 provenance,
850 options: empty_options(),
851 kind: PartKind::Text { text },
852 }
853}
854
855fn user_part(
856 session_id: &str,
857 message_id: &str,
858 ordinal: usize,
859 value: &Value,
860 state: &FileState,
861 provenance: Provenance,
862) -> Part {
863 match value.get("type").and_then(Value::as_str) {
864 Some("text") => text_part(
865 session_id,
866 message_id,
867 ordinal,
868 extract_str(value, "text"),
869 provenance,
870 ),
871 Some("image") | Some("file") => {
872 file_part(session_id, message_id, ordinal, value, provenance)
873 }
874 Some("tool_result") => {
875 tool_result_part(session_id, message_id, ordinal, value, None, state)
876 }
877 _ => text_part(
881 session_id,
882 message_id,
883 ordinal,
884 Some(extract_compact_repr(value)),
885 provenance,
886 ),
887 }
888}
889
890fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, value: &Value) -> Part {
891 match value.get("type").and_then(Value::as_str) {
895 Some("text") => text_part(
896 session_id,
897 message_id,
898 ordinal,
899 extract_str(value, "text"),
900 Provenance::Conversational,
901 ),
902 Some("thinking") => Part {
903 session_id: session_id.to_owned(),
904 id: part_id(message_id, ordinal),
905 message_id: message_id.to_owned(),
906 ordinal: part_ordinal(ordinal),
907 provenance: Provenance::Conversational,
908 options: signature_options(value),
909 kind: PartKind::Reasoning {
910 text: extract_str(value, "thinking"),
911 },
912 },
913 Some("tool_use") => Part {
914 session_id: session_id.to_owned(),
915 id: part_id(message_id, ordinal),
916 message_id: message_id.to_owned(),
917 ordinal: part_ordinal(ordinal),
918 provenance: Provenance::Conversational,
919 options: empty_options(),
920 kind: PartKind::ToolCall {
921 call_id: extract_str(value, "id"),
922 name: extract_str(value, "name"),
923 params: value.get("input").cloned().unwrap_or(Value::Null),
924 provider_executed: false,
925 },
926 },
927 Some("server_tool_use") => Part {
928 session_id: session_id.to_owned(),
929 id: part_id(message_id, ordinal),
930 message_id: message_id.to_owned(),
931 ordinal: part_ordinal(ordinal),
932 provenance: Provenance::Conversational,
933 options: empty_options(),
934 kind: PartKind::ToolCall {
935 call_id: extract_str(value, "id"),
936 name: extract_str(value, "name"),
937 params: value.get("input").cloned().unwrap_or(Value::Null),
938 provider_executed: true,
939 },
940 },
941 Some("image") | Some("file") => file_part(
942 session_id,
943 message_id,
944 ordinal,
945 value,
946 Provenance::Conversational,
947 ),
948 _ => text_part(
951 session_id,
952 message_id,
953 ordinal,
954 Some(extract_compact_repr(value)),
955 Provenance::Conversational,
956 ),
957 }
958}
959
960fn tool_result_part(
961 session_id: &str,
962 message_id: &str,
963 ordinal: usize,
964 value: &Value,
965 source_tool_result: Option<&Value>,
966 state: &FileState,
967) -> Part {
968 let call_id = extract_str(value, "tool_use_id");
969 let name = value
975 .str_field("tool_use_id")
976 .and_then(|id| state.tool_call_names.get(id))
977 .cloned();
978 let result = value
979 .get("content")
980 .cloned()
981 .or_else(|| source_tool_result.cloned())
982 .unwrap_or(Value::Null);
983 Part {
984 session_id: session_id.to_owned(),
985 id: part_id(message_id, ordinal),
986 message_id: message_id.to_owned(),
987 ordinal: part_ordinal(ordinal),
988 provenance: Provenance::Injected,
991 options: empty_options(),
992 kind: PartKind::ToolResult {
993 call_id,
994 name,
995 is_failure: value
996 .get("is_error")
997 .and_then(Value::as_bool)
998 .unwrap_or(false),
999 result,
1000 },
1001 }
1002}
1003
1004fn file_part(
1005 session_id: &str,
1006 message_id: &str,
1007 ordinal: usize,
1008 value: &Value,
1009 provenance: Provenance,
1010) -> Part {
1011 let media_type = value
1012 .get("media_type")
1013 .or_else(|| value.get("mime_type"))
1014 .and_then(Value::as_str)
1015 .map(ToOwned::to_owned);
1016 let file_name = value
1017 .get("file_name")
1018 .or_else(|| value.get("name"))
1019 .and_then(Value::as_str)
1020 .map(ToOwned::to_owned);
1021 let data = if let Some(source) = value.get("source") {
1022 if let Some(url) = source.get("url").and_then(Value::as_str) {
1023 FileData::Url(url.to_owned())
1024 } else if let Some(bytes) = source.get("data").and_then(Value::as_str) {
1025 FileData::String(bytes.to_owned())
1026 } else {
1027 FileData::String(compact_json(source))
1028 }
1029 } else if let Some(url) = value.get("url").and_then(Value::as_str) {
1030 FileData::Url(url.to_owned())
1031 } else {
1032 FileData::String(compact_json(value))
1033 };
1034
1035 Part {
1036 session_id: session_id.to_owned(),
1037 id: part_id(message_id, ordinal),
1038 message_id: message_id.to_owned(),
1039 ordinal: part_ordinal(ordinal),
1040 provenance,
1041 options: empty_options(),
1042 kind: PartKind::File {
1043 media_type,
1044 file_name,
1045 data,
1046 },
1047 }
1048}
1049
1050fn row_options(row: &Value, line: usize) -> ProviderOptions {
1051 let mut options = ProviderOptions::new();
1052 let source = json!({
1053 "line": line,
1054 "parent_uuid": row.get("parentUuid"),
1055 "is_sidechain": row.get("isSidechain"),
1056 "user_type": row.get("userType"),
1057 "entrypoint": row.get("entrypoint"),
1058 "cwd": row.get("cwd"),
1059 "version": row.get("version"),
1060 "git_branch": row.get("gitBranch"),
1061 "request_id": row.get("requestId"),
1062 "raw_type": row.get("type"),
1063 "raw_record": extract_raw_record(row),
1064 });
1065 options.insert("source".to_owned(), source);
1066 options
1067}
1068
1069fn assistant_options(row: &Value, message_value: &Value, line: usize) -> ProviderOptions {
1070 let mut options = row_options(row, line);
1071 let anthropic = json!({
1072 "id": message_value.get("id"),
1073 "model": message_value.get("model"),
1074 "stop_reason": message_value.get("stop_reason"),
1075 "stop_sequence": message_value.get("stop_sequence"),
1076 "usage": message_value.get("usage"),
1077 });
1078 options.insert("anthropic".to_owned(), anthropic);
1079 options
1080}
1081
1082fn signature_options(value: &Value) -> ProviderOptions {
1083 let mut options = ProviderOptions::new();
1084 if let Some(signature) = value.get("signature").and_then(Value::as_str) {
1085 options.insert("anthropic".to_owned(), json!({"signature": signature}));
1086 }
1087 options
1088}
1089
1090fn attachment_content(value: &Value) -> Option<Extracted<String>> {
1091 extract_str(value, "content").or_else(|| extract_str(value, "stdout"))
1092}
1093
1094fn parse_timestamp(value: &Value) -> anyhow::Result<DateTime<Utc>> {
1095 let timestamp = value
1096 .get("timestamp")
1097 .and_then(Value::as_str)
1098 .context("missing timestamp")?;
1099 Ok(DateTime::parse_from_rfc3339(timestamp)
1100 .context("invalid timestamp")?
1101 .with_timezone(&Utc))
1102}
1103
1104fn is_tool_result(value: &Value) -> bool {
1105 value.get("type").and_then(Value::as_str) == Some("tool_result")
1106}
1107
1108fn is_meta_row(row: &Value) -> bool {
1111 row.get("isMeta").and_then(Value::as_bool) == Some(true)
1112}
1113
1114fn is_injected_user_text(text: &str) -> bool {
1118 let trimmed = text.trim_start();
1119 trimmed.starts_with("<task-notification>")
1120 || trimmed.starts_with("<command-name>")
1121 || trimmed.starts_with("<command-message>")
1122 || trimmed.starts_with("<command-args>")
1123 || trimmed.starts_with("<local-command-caveat>")
1124 || trimmed.starts_with("<local-command-stdout>")
1125 || trimmed.starts_with("[Request interrupted by user")
1126}
1127
1128fn user_text_provenance(row: &Value, text: &str) -> Provenance {
1131 if is_meta_row(row) || is_injected_user_text(text) {
1132 Provenance::Injected
1133 } else {
1134 Provenance::Conversational
1135 }
1136}
1137
1138fn user_array_provenance(row: &Value, items: &[Value]) -> Provenance {
1142 if is_meta_row(row) {
1143 return Provenance::Injected;
1144 }
1145 let wrapped = items.iter().any(|item| {
1146 item.get("type").and_then(Value::as_str) == Some("text")
1147 && item
1148 .get("text")
1149 .and_then(Value::as_str)
1150 .is_some_and(is_injected_user_text)
1151 });
1152 if wrapped {
1153 Provenance::Injected
1154 } else {
1155 Provenance::Conversational
1156 }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161 #![allow(clippy::expect_used, clippy::unwrap_used)]
1169
1170 use super::*;
1171 use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
1172 use tempfile::TempDir;
1173
1174 const FIXTURE_ROOT: &str = "tests/fixtures/adapter/claude_code/projects";
1175
1176 #[test]
1177 fn probe_default_finds_claude_projects_under_home() -> anyhow::Result<()> {
1178 crate::adapter::test_support::assert_probe_default(
1179 &ClaudeCodeFactory,
1180 &[".claude", "projects"],
1181 )
1182 }
1183
1184 #[tokio::test(flavor = "multi_thread")]
1185 async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
1186 let adapter = ClaudeCodeAdapter::new(FIXTURE_ROOT);
1187 crate::adapter::test_support::assert_native_restore(
1188 &ClaudeCodeFactory,
1189 &adapter,
1190 std::path::Path::new(FIXTURE_ROOT),
1191 )
1192 .await
1193 }
1194
1195 #[tokio::test(flavor = "multi_thread")]
1203 async fn subagent_file_derives_child_session_with_parent_link() -> anyhow::Result<()> {
1204 let corpus = TempDir::new()?;
1205 let project_dir = corpus.path().join("-tmp-pond-test");
1206 let parent_uuid = "11111111-1111-1111-1111-111111111111";
1207 let agent_hash = "abc123def456";
1208 std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1209
1210 let parent_row = serde_json::json!({
1212 "type": "user",
1213 "uuid": "u-parent-1",
1214 "sessionId": parent_uuid,
1215 "cwd": "/tmp/pond-test",
1216 "timestamp": "2026-05-16T00:00:00.000Z",
1217 "version": "2.1.121",
1218 "message": {"role": "user", "content": "hi parent"},
1219 });
1220 std::fs::write(
1221 project_dir.join(format!("{parent_uuid}.jsonl")),
1222 format!("{parent_row}\n"),
1223 )?;
1224
1225 let subagent_row = serde_json::json!({
1228 "type": "user",
1229 "uuid": "u-sub-1",
1230 "sessionId": parent_uuid,
1231 "cwd": "/tmp/pond-test",
1232 "isSidechain": true,
1233 "agentId": agent_hash,
1234 "timestamp": "2026-05-16T00:01:00.000Z",
1235 "version": "2.1.121",
1236 "message": {"role": "user", "content": "subagent prompt"},
1237 });
1238 std::fs::write(
1239 project_dir
1240 .join(parent_uuid)
1241 .join("subagents")
1242 .join(format!("agent-{agent_hash}.jsonl")),
1243 format!("{subagent_row}\n"),
1244 )?;
1245 std::fs::write(
1246 project_dir
1247 .join(parent_uuid)
1248 .join("subagents")
1249 .join(format!("agent-{agent_hash}.meta.json")),
1250 r#"{"agentType":"general-purpose","description":"do a thing"}"#,
1251 )?;
1252
1253 let store_dir = TempDir::new()?;
1254 let store = Store::open_local(store_dir.path()).await?;
1255 let adapter = ClaudeCodeAdapter::new(corpus.path());
1256
1257 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1258 assert_eq!(
1259 summary.dropped_sessions, 0,
1260 "subagent file must NOT collide with parent (pre-fix this was the project-immutable rejection)"
1261 );
1262
1263 let parent = store
1264 .get_session(parent_uuid)
1265 .await?
1266 .expect("parent session should ingest as the bare uuid");
1267 assert_eq!(parent.session.source_agent, "claude-code");
1268 assert_eq!(parent.session.parent_session_id, None);
1269
1270 let child_id = format!("{parent_uuid}/agent-{agent_hash}");
1271 let child = store
1272 .get_session(&child_id)
1273 .await?
1274 .expect("subagent session must surface under the derived id");
1275 assert_eq!(
1276 child.session.source_agent, "claude-code/general-purpose",
1277 "agent_type from .meta.json should suffix the source_agent label"
1278 );
1279 assert_eq!(
1280 child.session.parent_session_id.as_deref(),
1281 Some(parent_uuid),
1282 "subagent must link back to parent via parent_session_id",
1283 );
1284 let subagent_meta = child
1285 .session
1286 .options
1287 .get("subagent")
1288 .expect("options.subagent must carry the hash + verbatim meta.json");
1289 assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1290 assert_eq!(
1291 subagent_meta["meta"]["agentType"],
1292 serde_json::json!("general-purpose")
1293 );
1294 assert_eq!(
1295 subagent_meta["meta"]["description"],
1296 serde_json::json!("do a thing")
1297 );
1298 Ok(())
1299 }
1300
1301 #[tokio::test(flavor = "multi_thread")]
1305 async fn subagent_without_meta_falls_back_to_generic_label() -> anyhow::Result<()> {
1306 let corpus = TempDir::new()?;
1307 let project_dir = corpus.path().join("-tmp-pond-test");
1308 let parent_uuid = "22222222-2222-2222-2222-222222222222";
1309 let agent_hash = "deadbeef";
1310 std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1311 let row = serde_json::json!({
1312 "type": "user",
1313 "uuid": "u-sub-only",
1314 "sessionId": parent_uuid,
1315 "cwd": "/tmp/pond-test",
1316 "timestamp": "2026-05-16T00:00:00.000Z",
1317 "message": {"role": "user", "content": "no meta sibling here"},
1318 });
1319 std::fs::write(
1320 project_dir
1321 .join(parent_uuid)
1322 .join("subagents")
1323 .join(format!("agent-{agent_hash}.jsonl")),
1324 format!("{row}\n"),
1325 )?;
1326
1327 let store_dir = TempDir::new()?;
1328 let store = Store::open_local(store_dir.path()).await?;
1329 let adapter = ClaudeCodeAdapter::new(corpus.path());
1330 let _summary =
1331 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1332
1333 let child = store
1334 .get_session(&format!("{parent_uuid}/agent-{agent_hash}"))
1335 .await?
1336 .expect("derived child id even without meta");
1337 assert_eq!(child.session.source_agent, "claude-code/subagent");
1338 Ok(())
1339 }
1340
1341 #[tokio::test(flavor = "multi_thread")]
1349 async fn workflow_nested_subagent_derives_distinct_child_not_parent_collision()
1350 -> anyhow::Result<()> {
1351 let corpus = TempDir::new()?;
1352 let project_dir = corpus.path().join("-tmp-pond-test");
1353 let parent_uuid = "44444444-4444-4444-4444-444444444444";
1354 let wf_id = "wf_abcd1234-ef0";
1355 let agent_hash = "cafef00dbaadf00d1";
1356 let wf_dir = project_dir
1357 .join(parent_uuid)
1358 .join("subagents")
1359 .join("workflows")
1360 .join(wf_id);
1361 std::fs::create_dir_all(&wf_dir)?;
1362
1363 let parent_row = serde_json::json!({
1364 "type": "user",
1365 "uuid": "u-parent-1",
1366 "sessionId": parent_uuid,
1367 "cwd": "/tmp/pond-test",
1368 "timestamp": "2026-05-20T00:00:00.000Z",
1369 "message": {"role": "user", "content": "hi parent"},
1370 });
1371 std::fs::write(
1372 project_dir.join(format!("{parent_uuid}.jsonl")),
1373 format!("{parent_row}\n"),
1374 )?;
1375
1376 let subagent_row = serde_json::json!({
1378 "type": "user",
1379 "uuid": "u-wf-sub-1",
1380 "sessionId": parent_uuid,
1381 "cwd": "/tmp/pond-test/packages/sub",
1382 "isSidechain": true,
1383 "agentId": agent_hash,
1384 "timestamp": "2026-05-20T00:01:00.000Z",
1385 "message": {"role": "user", "content": "workflow subagent prompt"},
1386 });
1387 std::fs::write(
1388 wf_dir.join(format!("agent-{agent_hash}.jsonl")),
1389 format!("{subagent_row}\n"),
1390 )?;
1391 std::fs::write(
1392 wf_dir.join(format!("agent-{agent_hash}.meta.json")),
1393 r#"{"agentType":"general-purpose","description":"workflow child"}"#,
1394 )?;
1395
1396 let store_dir = TempDir::new()?;
1397 let store = Store::open_local(store_dir.path()).await?;
1398 let adapter = ClaudeCodeAdapter::new(corpus.path());
1399 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1400 assert_eq!(
1401 summary.dropped_sessions, 0,
1402 "nested workflow subagent must NOT collide with the parent project",
1403 );
1404
1405 let parent = store
1406 .get_session(parent_uuid)
1407 .await?
1408 .expect("parent session ingests under the bare uuid");
1409 assert_eq!(&*parent.session.project, "/tmp/pond-test");
1410 assert_eq!(parent.session.parent_session_id, None);
1411
1412 let child_id = format!("{parent_uuid}/workflows/{wf_id}/agent-{agent_hash}");
1413 let child = store
1414 .get_session(&child_id)
1415 .await?
1416 .expect("workflow subagent surfaces under the full nested child id");
1417 assert_eq!(child.session.source_agent, "claude-code/general-purpose");
1418 assert_eq!(
1419 child.session.parent_session_id.as_deref(),
1420 Some(parent_uuid)
1421 );
1422 assert_eq!(
1423 &*child.session.project, "/tmp/pond-test/packages/sub",
1424 "child keeps its own cwd-derived project, distinct from the parent",
1425 );
1426 let subagent_meta = child
1427 .session
1428 .options
1429 .get("subagent")
1430 .expect("options.subagent present");
1431 assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1432 Ok(())
1433 }
1434
1435 #[tokio::test(flavor = "multi_thread")]
1441 async fn unrecognized_subagents_file_fails_visibly_not_merged() -> anyhow::Result<()> {
1442 let corpus = TempDir::new()?;
1443 let project_dir = corpus.path().join("-tmp-pond-test");
1444 let parent_uuid = "55555555-5555-5555-5555-555555555555";
1445 let unknown_dir = project_dir
1446 .join(parent_uuid)
1447 .join("subagents")
1448 .join("workflows")
1449 .join("wf_future01-aaa");
1450 std::fs::create_dir_all(&unknown_dir)?;
1451
1452 let parent_row = serde_json::json!({
1453 "type": "user",
1454 "uuid": "u-parent-only",
1455 "sessionId": parent_uuid,
1456 "cwd": "/tmp/pond-test",
1457 "timestamp": "2026-05-20T00:00:00.000Z",
1458 "message": {"role": "user", "content": "parent message"},
1459 });
1460 std::fs::write(
1461 project_dir.join(format!("{parent_uuid}.jsonl")),
1462 format!("{parent_row}\n"),
1463 )?;
1464
1465 let unknown_row = serde_json::json!({
1468 "type": "user",
1469 "uuid": "u-should-not-merge",
1470 "sessionId": parent_uuid,
1471 "cwd": "/tmp/pond-test",
1472 "timestamp": "2026-05-20T00:02:00.000Z",
1473 "message": {"role": "user", "content": "must not land under parent"},
1474 });
1475 std::fs::write(
1476 unknown_dir.join("transcript-001.jsonl"),
1477 format!("{unknown_row}\n"),
1478 )?;
1479
1480 let store_dir = TempDir::new()?;
1481 let store = Store::open_local(store_dir.path()).await?;
1482 let adapter = ClaudeCodeAdapter::new(corpus.path());
1483 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1484
1485 assert_eq!(
1486 summary.skipped_files, 1,
1487 "the unrecognized subagents/ transcript must be a visible, counted skip",
1488 );
1489 let parent = store
1490 .get_session(parent_uuid)
1491 .await?
1492 .expect("parent session ingests");
1493 assert_eq!(
1494 parent.messages.len(),
1495 1,
1496 "the unrecognized file's row must NOT be merged into the parent session",
1497 );
1498 assert!(
1499 parent
1500 .messages
1501 .iter()
1502 .all(|m| m.message.id() != "u-should-not-merge"),
1503 "parent must not absorb the unrecognized file's message",
1504 );
1505 Ok(())
1506 }
1507
1508 #[tokio::test(flavor = "multi_thread")]
1516 async fn unrecognized_subagents_file_stays_visible_under_parent_watermark() -> anyhow::Result<()>
1517 {
1518 struct ParentAlreadyFresh;
1519 impl crate::adapter::SkipOracle for ParentAlreadyFresh {
1520 fn last_ingested_at(&self, _session_id: &str) -> Option<DateTime<Utc>> {
1521 Some(
1525 DateTime::parse_from_rfc3339("2999-01-01T00:00:00Z")
1526 .unwrap()
1527 .with_timezone(&Utc),
1528 )
1529 }
1530 fn is_empty(&self) -> bool {
1531 false
1532 }
1533 }
1534
1535 let corpus = TempDir::new()?;
1536 let project_dir = corpus.path().join("-tmp-pond-test");
1537 let parent_uuid = "66666666-6666-6666-6666-666666666666";
1538 let unknown_dir = project_dir
1539 .join(parent_uuid)
1540 .join("subagents")
1541 .join("workflows")
1542 .join("wf_future02-bbb");
1543 std::fs::create_dir_all(&unknown_dir)?;
1544
1545 let parent_row = serde_json::json!({
1546 "type": "user",
1547 "uuid": "u-parent-fresh",
1548 "sessionId": parent_uuid,
1549 "cwd": "/tmp/pond-test",
1550 "timestamp": "2026-05-20T00:00:00.000Z",
1551 "message": {"role": "user", "content": "parent message"},
1552 });
1553 std::fs::write(
1554 project_dir.join(format!("{parent_uuid}.jsonl")),
1555 format!("{parent_row}\n"),
1556 )?;
1557
1558 let unknown_row = serde_json::json!({
1561 "type": "user",
1562 "uuid": "u-resync-should-stay-visible",
1563 "sessionId": parent_uuid,
1564 "cwd": "/tmp/pond-test",
1565 "timestamp": "2026-05-20T00:02:00.000Z",
1566 "message": {"role": "user", "content": "must stay visible"},
1567 });
1568 std::fs::write(
1569 unknown_dir.join("transcript-002.jsonl"),
1570 format!("{unknown_row}\n"),
1571 )?;
1572
1573 let store_dir = TempDir::new()?;
1574 let store = Store::open_local(store_dir.path()).await?;
1575 let adapter = ClaudeCodeAdapter::new(corpus.path());
1576 let summary = ingest_adapter(&store, &adapter, &ParentAlreadyFresh, |_| {}).await?;
1577
1578 assert_eq!(
1579 summary.skipped_files, 1,
1580 "the unrecognized transcript must stay a visible Unsupported skip, not be fresh-skipped under the parent's watermark",
1581 );
1582 assert_eq!(
1585 summary.skipped_fresh, 1,
1586 "only the parent may fresh-skip; the unrecognized file must not borrow its watermark",
1587 );
1588 Ok(())
1589 }
1590
1591 #[tokio::test(flavor = "multi_thread")]
1596 async fn replay_duplicates_are_dedup_at_adapter_layer() -> anyhow::Result<()> {
1597 let corpus = TempDir::new()?;
1598 let project_dir = corpus.path().join("-tmp-pond-test");
1599 std::fs::create_dir_all(&project_dir)?;
1600 let session_uuid = "33333333-3333-3333-3333-333333333333";
1601 let dup_uuid = "u-shared-1";
1602 let row = serde_json::json!({
1603 "type": "user",
1604 "uuid": dup_uuid,
1605 "sessionId": session_uuid,
1606 "cwd": "/tmp/pond-test",
1607 "timestamp": "2026-05-16T00:00:00.000Z",
1608 "message": {"role": "user", "content": "replayed three times"},
1609 });
1610 let body = format!("{row}\n{row}\n{row}\n");
1612 std::fs::write(project_dir.join(format!("{session_uuid}.jsonl")), body)?;
1613
1614 let store_dir = TempDir::new()?;
1615 let store = Store::open_local(store_dir.path()).await?;
1616 let adapter = ClaudeCodeAdapter::new(corpus.path());
1617 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1618
1619 assert_eq!(
1620 summary.dropped_events, 0,
1621 "adapter must dedupe replays before they reach the validator"
1622 );
1623 assert!(
1624 !summary
1625 .drop_reasons
1626 .contains_key(crate::sessions::DROP_REASON_DUPLICATE_MESSAGE_ID),
1627 "duplicate_message_id bucket stays empty when adapter does its job"
1628 );
1629 Ok(())
1630 }
1631
1632 #[tokio::test(flavor = "multi_thread")]
1636 async fn tool_result_name_resolves_from_prior_tool_use_in_same_file() -> anyhow::Result<()> {
1637 let corpus = TempDir::new()?;
1638 let project_dir = corpus.path().join("-tmp-pond-test");
1639 std::fs::create_dir_all(&project_dir)?;
1640 let session_uuid = "44444444-4444-4444-4444-444444444444";
1641 let call_id = "toolu_test_01";
1642
1643 let tool_use_row = serde_json::json!({
1644 "type": "assistant",
1645 "uuid": "u-call",
1646 "sessionId": session_uuid,
1647 "cwd": "/tmp/pond-test",
1648 "timestamp": "2026-05-16T00:00:00.000Z",
1649 "message": {
1650 "role": "assistant",
1651 "content": [{
1652 "type": "tool_use",
1653 "id": call_id,
1654 "name": "Edit",
1655 "input": {"file_path": "/tmp/foo"},
1656 }],
1657 },
1658 });
1659 let tool_result_row = serde_json::json!({
1660 "type": "user",
1661 "uuid": "u-result",
1662 "sessionId": session_uuid,
1663 "cwd": "/tmp/pond-test",
1664 "timestamp": "2026-05-16T00:00:01.000Z",
1665 "message": {
1666 "role": "user",
1667 "content": [{
1668 "type": "tool_result",
1669 "tool_use_id": call_id,
1670 "content": "ok",
1671 }],
1672 },
1673 });
1674 std::fs::write(
1675 project_dir.join(format!("{session_uuid}.jsonl")),
1676 format!("{tool_use_row}\n{tool_result_row}\n"),
1677 )?;
1678
1679 let store_dir = TempDir::new()?;
1680 let store = Store::open_local(store_dir.path()).await?;
1681 let adapter = ClaudeCodeAdapter::new(corpus.path());
1682 let _summary =
1683 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1684 let session = store
1685 .get_session(session_uuid)
1686 .await?
1687 .expect("session ingests");
1688
1689 let mut saw_call = false;
1690 let mut saw_result = false;
1691 for stored in &session.messages {
1692 for part in &stored.parts {
1693 match &part.kind {
1694 PartKind::ToolCall {
1695 call_id: cid, name, ..
1696 } => {
1697 assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1698 assert_eq!(
1699 name.as_ref().map(|e| e.as_str()),
1700 Some("Edit"),
1701 "tool_use carries the name directly"
1702 );
1703 saw_call = true;
1704 }
1705 PartKind::ToolResult {
1706 call_id: cid, name, ..
1707 } => {
1708 assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1709 assert_eq!(
1710 name.as_ref().map(|e| e.as_str()),
1711 Some("Edit"),
1712 "tool_result resolves the name via the per-file map (was 'unknown' pre-2026-05-16)"
1713 );
1714 saw_result = true;
1715 }
1716 _ => {}
1717 }
1718 }
1719 }
1720 assert!(saw_call && saw_result, "both parts must be present");
1721 Ok(())
1722 }
1723
1724 #[test]
1728 fn user_text_provenance_separates_prompts_from_harness_injection() {
1729 let prompt = json!({"type": "user", "uuid": "u1"});
1730 assert_eq!(
1731 user_text_provenance(&prompt, "please refactor the parser"),
1732 Provenance::Conversational,
1733 );
1734
1735 let notification = json!({"type": "user", "uuid": "u2"});
1736 assert_eq!(
1737 user_text_provenance(
1738 ¬ification,
1739 "<task-notification>background task done</task-notification>",
1740 ),
1741 Provenance::Injected,
1742 );
1743
1744 let meta = json!({"type": "user", "uuid": "u3", "isMeta": true});
1745 assert_eq!(
1746 user_text_provenance(&meta, "expanded skill body"),
1747 Provenance::Injected,
1748 );
1749 }
1750
1751 #[tokio::test(flavor = "multi_thread")]
1755 async fn task_notification_message_yields_injected_parts() -> anyhow::Result<()> {
1756 let corpus = TempDir::new()?;
1757 let project_dir = corpus.path().join("-tmp-pond-test");
1758 std::fs::create_dir_all(&project_dir)?;
1759 let session_uuid = "66666666-6666-6666-6666-666666666666";
1760 let prompt = serde_json::json!({
1761 "type": "user",
1762 "uuid": "u-prompt",
1763 "sessionId": session_uuid,
1764 "cwd": "/tmp/pond-test",
1765 "timestamp": "2026-05-16T00:00:00.000Z",
1766 "message": {"role": "user", "content": "genuine human prompt"},
1767 });
1768 let notification = serde_json::json!({
1769 "type": "user",
1770 "uuid": "u-notify",
1771 "sessionId": session_uuid,
1772 "cwd": "/tmp/pond-test",
1773 "timestamp": "2026-05-16T00:00:01.000Z",
1774 "message": {
1775 "role": "user",
1776 "content": "<task-notification>a background task finished</task-notification>",
1777 },
1778 });
1779 std::fs::write(
1780 project_dir.join(format!("{session_uuid}.jsonl")),
1781 format!("{prompt}\n{notification}\n"),
1782 )?;
1783
1784 let store_dir = TempDir::new()?;
1785 let store = Store::open_local(store_dir.path()).await?;
1786 let adapter = ClaudeCodeAdapter::new(corpus.path());
1787 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1788
1789 let session = store
1790 .get_session(session_uuid)
1791 .await?
1792 .expect("session ingests");
1793 let mut saw_prompt = false;
1794 let mut saw_notification = false;
1795 for stored in &session.messages {
1796 for part in &stored.parts {
1797 if stored.message.id() == "u-prompt" {
1798 assert_eq!(part.provenance, crate::wire::Provenance::Conversational);
1799 saw_prompt = true;
1800 }
1801 if stored.message.id() == "u-notify" {
1802 assert_eq!(part.provenance, crate::wire::Provenance::Injected);
1803 saw_notification = true;
1804 }
1805 }
1806 }
1807 assert!(saw_prompt && saw_notification, "both messages present");
1808 Ok(())
1809 }
1810
1811 #[tokio::test(flavor = "multi_thread")]
1815 async fn orphan_tool_result_yields_name_none_not_unknown_sentinel() -> anyhow::Result<()> {
1816 let corpus = TempDir::new()?;
1817 let project_dir = corpus.path().join("-tmp-pond-test");
1818 std::fs::create_dir_all(&project_dir)?;
1819 let session_uuid = "55555555-5555-5555-5555-555555555555";
1820
1821 let row = serde_json::json!({
1823 "type": "user",
1824 "uuid": "u-orphan",
1825 "sessionId": session_uuid,
1826 "cwd": "/tmp/pond-test",
1827 "timestamp": "2026-05-16T00:00:00.000Z",
1828 "message": {
1829 "role": "user",
1830 "content": [{
1831 "type": "tool_result",
1832 "tool_use_id": "toolu_orphan",
1833 "content": "result body, no matching call",
1834 }],
1835 },
1836 });
1837 std::fs::write(
1838 project_dir.join(format!("{session_uuid}.jsonl")),
1839 format!("{row}\n"),
1840 )?;
1841
1842 let store_dir = TempDir::new()?;
1843 let store = Store::open_local(store_dir.path()).await?;
1844 let adapter = ClaudeCodeAdapter::new(corpus.path());
1845 let _summary =
1846 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1847 let session = store
1848 .get_session(session_uuid)
1849 .await?
1850 .expect("session ingests");
1851 let mut found = false;
1852 for stored in &session.messages {
1853 for part in &stored.parts {
1854 if let PartKind::ToolResult { name, call_id, .. } = &part.kind {
1855 assert_eq!(call_id.as_ref().map(|e| e.as_str()), Some("toolu_orphan"));
1856 assert!(
1857 name.is_none(),
1858 "orphan tool_result must be name=None, not synthesized 'unknown'",
1859 );
1860 found = true;
1861 }
1862 }
1863 }
1864 assert!(found, "orphan tool_result part must be present");
1865 Ok(())
1867 }
1868
1869 #[test]
1873 fn workflow_journal_is_a_control_file_not_unsupported() {
1874 let adapter = ClaudeCodeAdapter::new("/tmp/pond-test-root");
1875 let journal = std::path::Path::new(
1876 "/root/-proj/55555555-5555-5555-5555-555555555555/subagents/workflows/wf_030e6487-da6/journal.jsonl",
1877 );
1878 assert!(is_workflow_control_file(journal));
1879 assert!(
1880 adapter.unsupported_reason(journal).is_none(),
1881 "journal.jsonl is a known control file, not an unsupported layout",
1882 );
1883 }
1884
1885 #[test]
1889 fn unknown_subagents_leaf_is_still_unsupported() {
1890 let adapter = ClaudeCodeAdapter::new("/tmp/pond-test-root");
1891 let unknown = std::path::Path::new(
1892 "/root/-proj/PARENT/subagents/workflows/wf_x/transcript-001.jsonl",
1893 );
1894 assert!(
1895 adapter.unsupported_reason(unknown).is_some(),
1896 "an unrecognized non-agent, non-journal leaf must still fail visibly",
1897 );
1898 assert!(!is_workflow_control_file(unknown));
1899
1900 let agent = std::path::Path::new("/root/-proj/PARENT/subagents/agent-abc123def456.jsonl");
1901 assert!(
1902 adapter.unsupported_reason(agent).is_none(),
1903 "a recognized agent transcript is resolvable, not unsupported",
1904 );
1905 }
1906
1907 #[tokio::test(flavor = "multi_thread")]
1912 async fn workflow_journal_skipped_benignly_while_sibling_agent_ingests() -> anyhow::Result<()> {
1913 let corpus = TempDir::new()?;
1914 let project_dir = corpus.path().join("-tmp-pond-test");
1915 let parent_uuid = "77777777-7777-7777-7777-777777777777";
1916 let wf_id = "wf_030e6487-da6";
1917 let agent_hash = "a38f4724ef3864da8";
1918 let wf_dir = project_dir
1919 .join(parent_uuid)
1920 .join("subagents")
1921 .join("workflows")
1922 .join(wf_id);
1923 std::fs::create_dir_all(&wf_dir)?;
1924
1925 let parent_row = serde_json::json!({
1926 "type": "user",
1927 "uuid": "u-parent-1",
1928 "sessionId": parent_uuid,
1929 "cwd": "/tmp/pond-test",
1930 "timestamp": "2026-06-04T00:00:00.000Z",
1931 "message": {"role": "user", "content": "hi parent"},
1932 });
1933 std::fs::write(
1934 project_dir.join(format!("{parent_uuid}.jsonl")),
1935 format!("{parent_row}\n"),
1936 )?;
1937
1938 let agent_row = serde_json::json!({
1939 "type": "user",
1940 "uuid": "u-agent-1",
1941 "sessionId": parent_uuid,
1942 "cwd": "/tmp/pond-test",
1943 "timestamp": "2026-06-04T00:01:00.000Z",
1944 "message": {"role": "user", "content": "workflow agent prompt"},
1945 });
1946 std::fs::write(
1947 wf_dir.join(format!("agent-{agent_hash}.jsonl")),
1948 format!("{agent_row}\n"),
1949 )?;
1950
1951 std::fs::write(
1953 wf_dir.join("journal.jsonl"),
1954 "{\"type\":\"started\",\"key\":\"v2:abc\",\"agentId\":\"a38f\"}\n\
1955 {\"type\":\"result\",\"key\":\"v2:abc\",\"agentId\":\"a38f\",\"result\":{}}\n",
1956 )?;
1957
1958 let store_dir = TempDir::new()?;
1959 let store = Store::open_local(store_dir.path()).await?;
1960 let adapter = ClaudeCodeAdapter::new(corpus.path());
1961 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1962 assert_eq!(
1963 summary.skipped_files, 0,
1964 "journal.jsonl is a control file (benign Empty skip), not an unsupported failure",
1965 );
1966
1967 let child = store
1968 .get_session(&format!(
1969 "{parent_uuid}/workflows/{wf_id}/agent-{agent_hash}"
1970 ))
1971 .await?
1972 .expect("the sibling agent transcript still ingests as a child session");
1973 assert_eq!(
1974 child.session.parent_session_id.as_deref(),
1975 Some(parent_uuid)
1976 );
1977
1978 let parent = store
1979 .get_session(parent_uuid)
1980 .await?
1981 .expect("parent session ingests");
1982 assert_eq!(
1983 parent.messages.len(),
1984 1,
1985 "journal rows must NOT merge into the parent session",
1986 );
1987 Ok(())
1988 }
1989
1990 #[tokio::test(flavor = "multi_thread")]
1994 async fn workflow_journal_with_parent_sessionid_still_not_merged() -> anyhow::Result<()> {
1995 let corpus = TempDir::new()?;
1996 let project_dir = corpus.path().join("-tmp-pond-test");
1997 let parent_uuid = "88888888-8888-8888-8888-888888888888";
1998 let wf_dir = project_dir
1999 .join(parent_uuid)
2000 .join("subagents")
2001 .join("workflows")
2002 .join("wf_abc01234-def");
2003 std::fs::create_dir_all(&wf_dir)?;
2004
2005 let parent_row = serde_json::json!({
2006 "type": "user",
2007 "uuid": "u-parent",
2008 "sessionId": parent_uuid,
2009 "cwd": "/tmp/pond-test",
2010 "timestamp": "2026-06-04T00:00:00.000Z",
2011 "message": {"role": "user", "content": "parent only"},
2012 });
2013 std::fs::write(
2014 project_dir.join(format!("{parent_uuid}.jsonl")),
2015 format!("{parent_row}\n"),
2016 )?;
2017
2018 let journal_row = serde_json::json!({
2021 "type": "started",
2022 "key": "v2:abc",
2023 "agentId": "a1",
2024 "sessionId": parent_uuid,
2025 "message": {"role": "user", "content": "must not merge"},
2026 });
2027 std::fs::write(wf_dir.join("journal.jsonl"), format!("{journal_row}\n"))?;
2028
2029 let store_dir = TempDir::new()?;
2030 let store = Store::open_local(store_dir.path()).await?;
2031 let adapter = ClaudeCodeAdapter::new(corpus.path());
2032 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
2033 assert_eq!(
2034 summary.skipped_files, 0,
2035 "journal is a benign Empty skip, not an unsupported failure",
2036 );
2037 let parent = store
2038 .get_session(parent_uuid)
2039 .await?
2040 .expect("parent session ingests");
2041 assert_eq!(
2042 parent.messages.len(),
2043 1,
2044 "journal row must NOT merge even when it carries the parent sessionId",
2045 );
2046 Ok(())
2047 }
2048}