1use std::{
10 collections::{HashMap, HashSet},
11 path::{Path, PathBuf},
12};
13
14use anyhow::Context as _;
15use chrono::{DateTime, SecondsFormat, Utc};
16use serde_json::{Value, json};
17
18use crate::{
19 sessions::IngestEvent,
20 wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
21};
22
23use super::{
24 Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
25 RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
26 empty_options,
27 extract::{
28 Extracted, Source, extract_compact_repr, extract_raw_record, extract_self_str, extract_str,
29 },
30 extracted_text,
31 jsonl::{BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events, source_line},
32 jsonl_bytes, part_id, part_ordinal, raw_record,
33};
34
35#[derive(Debug, Default)]
56pub(crate) struct FileState {
57 seen_uuids: HashSet<String>,
58 tool_call_names: HashMap<String, Extracted<String>>,
59}
60
61const NAME: &str = "claude-code";
65
66pub struct ClaudeCodeFactory;
69
70impl AdapterFactory for ClaudeCodeFactory {
71 fn name(&self) -> &'static str {
72 NAME
73 }
74
75 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
76 Ok(Box::new(ClaudeCodeAdapter::new(config_path(NAME, config)?)))
77 }
78
79 fn probe_default(&self, env: &Env) -> Option<Value> {
80 let path = env.home.join(".claude").join("projects");
81 path.exists().then(|| json!({ "path": path }))
82 }
83
84 fn serialize(
85 &self,
86 session: &crate::sessions::SessionWithMessages,
87 fidelity: RestoreFidelity,
88 ) -> Result<Vec<RestoredFile>, AdapterError> {
89 serialize_session(session, fidelity)
90 }
91}
92
93fn serialize_session(
94 session: &crate::sessions::SessionWithMessages,
95 fidelity: RestoreFidelity,
96) -> Result<Vec<RestoredFile>, AdapterError> {
97 let mut messages = session.messages.clone();
98 if fidelity == RestoreFidelity::Native {
99 messages.sort_by(|left, right| {
100 source_line(left.message.options())
101 .cmp(&source_line(right.message.options()))
102 .then_with(|| by_timestamp_then_id(left, right))
103 });
104 } else {
105 messages.sort_by(by_timestamp_then_id);
106 }
107 let mut records = Vec::with_capacity(messages.len());
111 let mut parent_uuid = None::<String>;
112 for message in &messages {
113 if fidelity == RestoreFidelity::Native
114 && let Some(raw) = raw_record(message.message.options())
115 {
116 parent_uuid = raw
117 .get("uuid")
118 .and_then(Value::as_str)
119 .map(ToOwned::to_owned)
120 .or(parent_uuid);
121 records.push(raw);
122 continue;
123 }
124 let Some(record) = claude_record(session, message, parent_uuid.as_deref()) else {
127 continue;
128 };
129 parent_uuid = record
130 .get("uuid")
131 .and_then(Value::as_str)
132 .map(ToOwned::to_owned);
133 records.push(record);
134 }
135
136 let mut files = vec![RestoredFile::new(
137 claude_relative_path(session),
138 jsonl_bytes(NAME, &records)?,
139 fidelity,
140 )];
141 if session.session.parent_session_id.is_some()
142 && let Some(meta) = subagent_meta_record(session)
143 {
144 let mut meta_path = files[0].relative_path.clone();
145 meta_path.set_extension("meta.json");
146 files.push(RestoredFile::new(
147 meta_path,
148 serde_json::to_vec(&meta).map_err(|err| {
149 AdapterError::schema(
150 NAME,
151 &session.session.id,
152 format!("json encode failed: {err}"),
153 )
154 })?,
155 fidelity,
156 ));
157 }
158 Ok(files)
159}
160
161fn claude_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
162 let encoded_project = session
163 .session
164 .options
165 .get("source")
166 .and_then(|source| source.get("project_dir"))
167 .and_then(Value::as_str)
168 .map(ToOwned::to_owned)
169 .unwrap_or_else(|| encode_project(&session.session.project));
170 if let Some(parent) = &session.session.parent_session_id {
171 let child_suffix = session
176 .session
177 .id
178 .strip_prefix(&format!("{parent}/"))
179 .unwrap_or(&session.session.id);
180 return PathBuf::from(encoded_project)
181 .join(parent)
182 .join("subagents")
183 .join(format!("{child_suffix}.jsonl"));
184 }
185 PathBuf::from(encoded_project).join(format!("{}.jsonl", session.session.id))
186}
187
188fn encode_project(project: &str) -> String {
189 project.replace(['/', '.'], "-")
190}
191
192fn subagent_meta_record(session: &crate::sessions::SessionWithMessages) -> Option<Value> {
193 let meta = session.session.options.get("subagent")?.get("meta")?;
197 meta.is_object().then(|| meta.clone())
198}
199
200fn claude_record(
201 session: &crate::sessions::SessionWithMessages,
202 message: &crate::sessions::MessageWithParts,
203 parent_uuid: Option<&str>,
204) -> Option<Value> {
205 let row_role = match &message.message {
213 Message::System { .. } => return None,
214 Message::User { .. } | Message::Tool { .. } => "user",
215 Message::Assistant { .. } => "assistant",
216 };
217 let mut envelope = serde_json::Map::new();
218 envelope.insert("role".to_owned(), Value::String(row_role.to_owned()));
219 if row_role == "assistant" {
220 envelope.insert("type".to_owned(), Value::String("message".to_owned()));
223 }
224 envelope.insert(
225 "content".to_owned(),
226 Value::Array(message.parts.iter().map(claude_part).collect()),
227 );
228 Some(json!({
229 "parentUuid": parent_uuid,
230 "isSidechain": false,
231 "userType": "external",
232 "cwd": &*session.session.project,
233 "sessionId": &session.session.id,
234 "type": row_role,
235 "message": Value::Object(envelope),
236 "uuid": message.message.id(),
237 "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
238 }))
239}
240
241fn claude_part(part: &Part) -> Value {
242 match &part.kind {
243 PartKind::Text { text } => json!({"type": "text", "text": extracted_text(text)}),
244 PartKind::Reasoning { text } => {
245 json!({"type": "thinking", "thinking": extracted_text(text)})
246 }
247 PartKind::ToolCall {
248 call_id,
249 name,
250 params,
251 provider_executed,
252 } => json!({
253 "type": if *provider_executed { "server_tool_use" } else { "tool_use" },
254 "id": extracted_text(call_id),
255 "name": extracted_text(name),
256 "input": params,
257 }),
258 PartKind::ToolResult {
259 call_id,
260 is_failure,
261 result,
262 ..
263 } => json!({
264 "type": "tool_result",
265 "tool_use_id": extracted_text(call_id),
266 "is_error": is_failure,
267 "content": result,
268 }),
269 PartKind::File {
270 media_type,
271 file_name,
272 data,
273 } => json!({
274 "type": "file",
275 "media_type": media_type,
276 "file_name": file_name,
277 "source": file_source(data),
278 }),
279 other => {
280 json!({"type": "text", "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null))})
281 }
282 }
283}
284
285fn file_source(data: &FileData) -> Value {
286 match data {
287 FileData::String(value) => json!({"type": "text", "data": value}),
288 FileData::Bytes(value) => json!({"type": "base64", "data": value}),
289 FileData::Url(value) => json!({"type": "url", "url": value}),
290 }
291}
292
293#[derive(Debug, Clone)]
296pub struct ClaudeCodeAdapter {
297 root: PathBuf,
298}
299
300impl ClaudeCodeAdapter {
301 pub fn new(root: impl Into<PathBuf>) -> Self {
302 Self { root: root.into() }
303 }
304}
305
306impl Adapter for ClaudeCodeAdapter {
307 fn discover(&self) -> DiscoverFuture<'_> {
308 jsonl_tree_discover(self)
309 }
310
311 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
312 jsonl_tree_events(self, oracle)
313 }
314}
315
316impl JsonlTree for ClaudeCodeAdapter {
317 type State = FileState;
318
319 fn name(&self) -> &'static str {
320 NAME
321 }
322
323 fn root(&self) -> &Path {
324 &self.root
325 }
326
327 fn peek_session_id(&self, path: &Path, first_line: &str) -> Option<String> {
328 if subagents_dir(path).is_some() {
335 let (parent_uuid, child_suffix, _) = subagent_ids(path)?;
336 return Some(format!("{parent_uuid}/{child_suffix}"));
337 }
338 let row: Value = serde_json::from_str(first_line).ok()?;
339 row.get("sessionId")?.as_str().map(ToOwned::to_owned)
340 }
341
342 fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
343 session_from_rows(path, rows)
344 }
345
346 fn events_from_row(
347 &self,
348 session: &Session,
349 row: &BoundedRow,
350 state: &mut Self::State,
351 ) -> Result<Vec<IngestEvent>, String> {
352 if let Some(uuid) = row.value.get("uuid").and_then(Value::as_str)
353 && !state.seen_uuids.insert(uuid.to_owned())
354 {
355 return Ok(Vec::new());
356 }
357 capture_tool_call_names(&row.value, &mut state.tool_call_names);
358 events_from_row(&session.id, row.line, &row.value, session.created_at, state)
359 }
360
361 fn unsupported_reason(&self, path: &Path) -> Option<String> {
362 if subagents_dir(path).is_some()
371 && subagent_ids(path).is_none()
372 && !is_workflow_control_file(path)
373 {
374 return Some(format!(
375 "{}: subagent transcript layout not recognized by this pond version; \
376 skipped so it is not merged into the parent session - update pond and \
377 re-run `pond sync`",
378 path.display()
379 ));
380 }
381 None
382 }
383}
384
385fn is_workflow_control_file(path: &Path) -> bool {
391 subagents_dir(path).is_some()
392 && path.file_name().and_then(|n| n.to_str()) == Some("journal.jsonl")
393}
394
395fn capture_tool_call_names(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
400 let Some(items) = row
401 .get("message")
402 .and_then(|message| message.get("content"))
403 .and_then(Value::as_array)
404 else {
405 return;
406 };
407 for item in items {
408 let kind = item.get("type").and_then(Value::as_str);
409 if !matches!(kind, Some("tool_use") | Some("server_tool_use")) {
410 continue;
411 }
412 let (Some(id), Some(name)) = (item.str_field("id"), extract_str(item, "name")) else {
413 continue;
414 };
415 map.insert(id.to_owned(), name);
416 }
417}
418
419fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
420 let path_display = path.display().to_string();
421 if subagents_dir(path).is_some() && subagent_ids(path).is_none() {
426 return Err(AdapterError::schema(
427 NAME,
428 path_display,
429 "sidecar/control file under subagents/ has no session of its own",
430 ));
431 }
432 let mut created_at = None;
433 let mut project: Option<Extracted<String>> = None;
434 let mut version = None;
435 for row in rows {
436 if created_at.is_none() {
437 created_at = parse_timestamp(&row.value).ok();
438 }
439 if project.is_none() {
440 project = extract_str(&row.value, "cwd");
441 }
442 if version.is_none() {
443 version = row
444 .value
445 .get("version")
446 .and_then(Value::as_str)
447 .map(ToOwned::to_owned);
448 }
449 }
450
451 let first = rows
452 .first()
453 .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
454 let at_first = format!("{path_display}:{}", first.line);
455 let raw_session_id = first
456 .value
457 .get("sessionId")
458 .and_then(Value::as_str)
459 .ok_or_else(|| {
460 AdapterError::schema(
461 NAME,
462 at_first.clone(),
463 format!("line {} missing sessionId", first.line),
464 )
465 })?
466 .to_owned();
467 let created_at = created_at.ok_or_else(|| {
468 AdapterError::schema(NAME, at_first, "session has no parseable timestamp")
469 })?;
470
471 let subagent = subagent_descriptor(path);
484 let project_dir = source_project_dir(path, subagent.is_some());
485 let (session_id, parent_session_id, source_agent, subagent_options) = match subagent {
486 Some(SubagentDescriptor {
487 parent_uuid,
488 child_suffix,
489 agent_hash,
490 agent_type,
491 meta,
492 }) => {
493 let child_id = format!("{parent_uuid}/{child_suffix}");
494 let agent_label = agent_type
495 .as_deref()
496 .map(|t| format!("claude-code/{t}"))
497 .unwrap_or_else(|| "claude-code/subagent".to_owned());
498 let metadata = json!({
502 "hash": agent_hash,
503 "raw_session_id": raw_session_id,
504 "meta": meta,
505 });
506 (child_id, Some(parent_uuid), agent_label, Some(metadata))
507 }
508 None => (raw_session_id, None, "claude-code".to_owned(), None),
509 };
510
511 let project = match project {
512 Some(value) => value,
513 None => {
514 let decoded = path
515 .parent()
516 .and_then(|p| p.file_name())
517 .and_then(|n| n.to_str())
518 .map(|s| s.replace('-', "/"))
519 .ok_or_else(|| {
520 AdapterError::schema(
521 NAME,
522 path_display.clone(),
523 "no `cwd` field in any row and source path is not UTF-8",
524 )
525 })?;
526 extract_self_str(&Value::String(decoded)).ok_or_else(|| {
527 AdapterError::schema(
528 NAME,
529 path_display.clone(),
530 "internal: Value::String produced None from Source::as_str",
531 )
532 })?
533 }
534 };
535
536 let mut options = ProviderOptions::new();
537 options.insert(
538 "source".to_owned(),
539 json!({
540 "adapter": "claude-code",
541 "version": version,
542 "project_dir": project_dir,
543 "workspace_path": &*project,
544 }),
545 );
546 if let Some(metadata) = subagent_options {
547 options.insert("subagent".to_owned(), metadata);
548 }
549
550 Ok(Session {
551 id: session_id,
552 parent_session_id,
553 parent_message_id: None,
554 source_agent,
555 created_at,
556 project,
557 options,
558 })
559}
560
561fn source_project_dir(path: &Path, is_subagent: bool) -> Option<String> {
562 let project_dir = if is_subagent {
567 subagents_dir(path)?.parent()?.parent()
568 } else {
569 path.parent()
570 };
571 project_dir
572 .and_then(|p| p.file_name())
573 .and_then(|n| n.to_str())
574 .map(ToOwned::to_owned)
575}
576
577fn subagents_dir(path: &Path) -> Option<&Path> {
582 let mut cur = path.parent();
583 while let Some(dir) = cur {
584 if dir.file_name().and_then(|n| n.to_str()) == Some("subagents") {
585 return Some(dir);
586 }
587 cur = dir.parent();
588 }
589 None
590}
591
592struct SubagentDescriptor {
598 parent_uuid: String,
599 child_suffix: String,
600 agent_hash: String,
601 agent_type: Option<String>,
602 meta: Option<Value>,
603}
604
605fn subagent_ids(path: &Path) -> Option<(String, String, String)> {
613 let file_name = path.file_name()?.to_str()?;
614 let agent_hash = file_name
615 .strip_prefix("agent-")?
616 .strip_suffix(".jsonl")?
617 .to_owned();
618 let subagents = subagents_dir(path)?;
619 let parent_uuid = subagents.parent()?.file_name()?.to_str()?.to_owned();
620 let child_suffix = path
624 .strip_prefix(subagents)
625 .ok()?
626 .with_extension("")
627 .to_str()?
628 .replace(std::path::MAIN_SEPARATOR, "/");
629 Some((parent_uuid, child_suffix, agent_hash))
630}
631
632fn subagent_descriptor(path: &Path) -> Option<SubagentDescriptor> {
635 let (parent_uuid, child_suffix, agent_hash) = subagent_ids(path)?;
636 let meta_path = path.parent()?.join(format!("agent-{agent_hash}.meta.json"));
637 let (agent_type, meta) = match std::fs::read(&meta_path) {
638 Ok(bytes) => match serde_json::from_slice::<Value>(&bytes) {
639 Ok(value) => (
640 value
641 .get("agentType")
642 .and_then(Value::as_str)
643 .map(ToOwned::to_owned),
644 Some(value),
645 ),
646 Err(error) => {
647 tracing::debug!(
648 target: "pond::adapter::claude_code",
649 meta = %meta_path.display(),
650 %error,
651 "subagent .meta.json present but unparseable; falling back to 'claude-code/subagent'",
652 );
653 (None, None)
654 }
655 },
656 Err(error) if error.kind() == std::io::ErrorKind::NotFound => (None, None),
657 Err(error) => {
658 tracing::debug!(
659 target: "pond::adapter::claude_code",
660 meta = %meta_path.display(),
661 %error,
662 "subagent .meta.json IO error; falling back to 'claude-code/subagent'",
663 );
664 (None, None)
665 }
666 };
667
668 Some(SubagentDescriptor {
669 parent_uuid,
670 child_suffix,
671 agent_hash,
672 agent_type,
673 meta,
674 })
675}
676
677fn events_from_row(
678 session_id: &str,
679 line: usize,
680 row: &Value,
681 default_timestamp: DateTime<Utc>,
682 state: &FileState,
683) -> Result<Vec<IngestEvent>, String> {
684 let timestamp = parse_timestamp(row).unwrap_or(default_timestamp);
685 let uuid = row
686 .get("uuid")
687 .and_then(Value::as_str)
688 .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
689
690 if let Some(message_value) = row.get("message") {
691 return message_events(
692 session_id,
693 &uuid,
694 timestamp,
695 row,
696 message_value,
697 state,
698 line,
699 );
700 }
701
702 let raw_type = row.get("type").and_then(Value::as_str);
709 let content = if raw_type == Some("attachment") {
710 row.get("attachment")
711 .and_then(attachment_content)
712 .or_else(|| Some(extract_compact_repr(row)))
713 } else {
714 extract_str(row, "subtype").or_else(|| extract_str(row, "type"))
715 };
716 let message = Message::System {
717 id: uuid,
718 session_id: session_id.to_owned(),
719 timestamp,
720 content,
721 options: row_options(row, line),
722 };
723 Ok(vec![IngestEvent::Message(message)])
724}
725
726fn message_events(
727 session_id: &str,
728 uuid: &str,
729 timestamp: DateTime<Utc>,
730 row: &Value,
731 message_value: &Value,
732 state: &FileState,
733 line: usize,
734) -> Result<Vec<IngestEvent>, String> {
735 let role = message_value
736 .get("role")
737 .and_then(Value::as_str)
738 .ok_or_else(|| "message missing role".to_owned())?;
739 let content = message_value.get("content").unwrap_or(&Value::Null);
740 let mut parts = Vec::new();
741 let message = match (role, content) {
742 ("user", Value::String(text)) => {
743 let provenance = user_text_provenance(row, text);
747 parts.push(text_part(
748 session_id,
749 uuid,
750 0,
751 extract_self_str(content),
752 provenance,
753 ));
754 Message::User {
755 id: uuid.to_owned(),
756 session_id: session_id.to_owned(),
757 timestamp,
758 options: row_options(row, line),
759 }
760 }
761 ("user", Value::Array(items)) if items.iter().all(is_tool_result) => {
762 let source_tool_result = row.get("toolUseResult").cloned();
763 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
764 tool_result_part(
765 session_id,
766 uuid,
767 ordinal,
768 item,
769 source_tool_result.as_ref(),
770 state,
771 )
772 }));
773 Message::Tool {
774 id: uuid.to_owned(),
775 session_id: session_id.to_owned(),
776 timestamp,
777 options: row_options(row, line),
778 }
779 }
780 ("user", Value::Array(items)) => {
781 let provenance = user_array_provenance(row, items);
784 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
785 user_part(session_id, uuid, ordinal, item, state, provenance)
786 }));
787 Message::User {
788 id: uuid.to_owned(),
789 session_id: session_id.to_owned(),
790 timestamp,
791 options: row_options(row, line),
792 }
793 }
794 ("assistant", Value::Array(items)) => {
795 parts.extend(
796 items
797 .iter()
798 .enumerate()
799 .map(|(ordinal, item)| assistant_part(session_id, uuid, ordinal, item)),
800 );
801 Message::Assistant {
802 id: uuid.to_owned(),
803 session_id: session_id.to_owned(),
804 timestamp,
805 options: assistant_options(row, message_value, line),
806 }
807 }
808 ("system", Value::String(_)) => Message::System {
809 id: uuid.to_owned(),
810 session_id: session_id.to_owned(),
811 timestamp,
812 content: extract_self_str(content),
813 options: row_options(row, line),
814 },
815 ("system", _) => Message::System {
816 id: uuid.to_owned(),
817 session_id: session_id.to_owned(),
818 timestamp,
819 content: Some(extract_compact_repr(message_value)),
824 options: row_options(row, line),
825 },
826 (other, _) => {
827 return Err(format!("unsupported message role {other}"));
828 }
829 };
830
831 let mut events = Vec::with_capacity(parts.len() + 1);
832 events.push(IngestEvent::Message(message));
833 events.extend(parts.into_iter().map(IngestEvent::Part));
834 Ok(events)
835}
836
837fn text_part(
838 session_id: &str,
839 message_id: &str,
840 ordinal: usize,
841 text: Option<Extracted<String>>,
842 provenance: Provenance,
843) -> Part {
844 Part {
845 session_id: session_id.to_owned(),
846 id: part_id(message_id, ordinal),
847 message_id: message_id.to_owned(),
848 ordinal: part_ordinal(ordinal),
849 provenance,
850 options: empty_options(),
851 kind: PartKind::Text { text },
852 }
853}
854
855fn user_part(
856 session_id: &str,
857 message_id: &str,
858 ordinal: usize,
859 value: &Value,
860 state: &FileState,
861 provenance: Provenance,
862) -> Part {
863 match value.get("type").and_then(Value::as_str) {
864 Some("text") => text_part(
865 session_id,
866 message_id,
867 ordinal,
868 extract_str(value, "text"),
869 provenance,
870 ),
871 Some("image") | Some("file") => {
872 file_part(session_id, message_id, ordinal, value, provenance)
873 }
874 Some("tool_result") => {
875 tool_result_part(session_id, message_id, ordinal, value, None, state)
876 }
877 _ => text_part(
881 session_id,
882 message_id,
883 ordinal,
884 Some(extract_compact_repr(value)),
885 provenance,
886 ),
887 }
888}
889
890fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, value: &Value) -> Part {
891 match value.get("type").and_then(Value::as_str) {
895 Some("text") => text_part(
896 session_id,
897 message_id,
898 ordinal,
899 extract_str(value, "text"),
900 Provenance::Conversational,
901 ),
902 Some("thinking") => Part {
903 session_id: session_id.to_owned(),
904 id: part_id(message_id, ordinal),
905 message_id: message_id.to_owned(),
906 ordinal: part_ordinal(ordinal),
907 provenance: Provenance::Conversational,
908 options: signature_options(value),
909 kind: PartKind::Reasoning {
910 text: extract_str(value, "thinking"),
911 },
912 },
913 Some("tool_use") => Part {
914 session_id: session_id.to_owned(),
915 id: part_id(message_id, ordinal),
916 message_id: message_id.to_owned(),
917 ordinal: part_ordinal(ordinal),
918 provenance: Provenance::Conversational,
919 options: empty_options(),
920 kind: PartKind::ToolCall {
921 call_id: extract_str(value, "id"),
922 name: extract_str(value, "name"),
923 params: value.get("input").cloned().unwrap_or(Value::Null),
924 provider_executed: false,
925 },
926 },
927 Some("server_tool_use") => Part {
928 session_id: session_id.to_owned(),
929 id: part_id(message_id, ordinal),
930 message_id: message_id.to_owned(),
931 ordinal: part_ordinal(ordinal),
932 provenance: Provenance::Conversational,
933 options: empty_options(),
934 kind: PartKind::ToolCall {
935 call_id: extract_str(value, "id"),
936 name: extract_str(value, "name"),
937 params: value.get("input").cloned().unwrap_or(Value::Null),
938 provider_executed: true,
939 },
940 },
941 Some("image") | Some("file") => file_part(
942 session_id,
943 message_id,
944 ordinal,
945 value,
946 Provenance::Conversational,
947 ),
948 _ => text_part(
951 session_id,
952 message_id,
953 ordinal,
954 Some(extract_compact_repr(value)),
955 Provenance::Conversational,
956 ),
957 }
958}
959
960fn tool_result_part(
961 session_id: &str,
962 message_id: &str,
963 ordinal: usize,
964 value: &Value,
965 source_tool_result: Option<&Value>,
966 state: &FileState,
967) -> Part {
968 let call_id = extract_str(value, "tool_use_id");
969 let name = value
975 .str_field("tool_use_id")
976 .and_then(|id| state.tool_call_names.get(id))
977 .cloned();
978 let result = value
979 .get("content")
980 .cloned()
981 .or_else(|| source_tool_result.cloned())
982 .unwrap_or(Value::Null);
983 Part {
984 session_id: session_id.to_owned(),
985 id: part_id(message_id, ordinal),
986 message_id: message_id.to_owned(),
987 ordinal: part_ordinal(ordinal),
988 provenance: Provenance::Injected,
991 options: empty_options(),
992 kind: PartKind::ToolResult {
993 call_id,
994 name,
995 is_failure: value
996 .get("is_error")
997 .and_then(Value::as_bool)
998 .unwrap_or(false),
999 result,
1000 },
1001 }
1002}
1003
1004fn file_part(
1005 session_id: &str,
1006 message_id: &str,
1007 ordinal: usize,
1008 value: &Value,
1009 provenance: Provenance,
1010) -> Part {
1011 let media_type = value
1012 .get("media_type")
1013 .or_else(|| value.get("mime_type"))
1014 .and_then(Value::as_str)
1015 .map(ToOwned::to_owned);
1016 let file_name = value
1017 .get("file_name")
1018 .or_else(|| value.get("name"))
1019 .and_then(Value::as_str)
1020 .map(ToOwned::to_owned);
1021 let data = if let Some(source) = value.get("source") {
1022 if let Some(url) = source.get("url").and_then(Value::as_str) {
1023 FileData::Url(url.to_owned())
1024 } else if let Some(bytes) = source.get("data").and_then(Value::as_str) {
1025 FileData::String(bytes.to_owned())
1026 } else {
1027 FileData::String(compact_json(source))
1028 }
1029 } else if let Some(url) = value.get("url").and_then(Value::as_str) {
1030 FileData::Url(url.to_owned())
1031 } else {
1032 FileData::String(compact_json(value))
1033 };
1034
1035 Part {
1036 session_id: session_id.to_owned(),
1037 id: part_id(message_id, ordinal),
1038 message_id: message_id.to_owned(),
1039 ordinal: part_ordinal(ordinal),
1040 provenance,
1041 options: empty_options(),
1042 kind: PartKind::File {
1043 media_type,
1044 file_name,
1045 data,
1046 },
1047 }
1048}
1049
1050fn row_options(row: &Value, line: usize) -> ProviderOptions {
1051 let mut options = ProviderOptions::new();
1052 let source = json!({
1053 "line": line,
1054 "parent_uuid": row.get("parentUuid"),
1055 "is_sidechain": row.get("isSidechain"),
1056 "user_type": row.get("userType"),
1057 "entrypoint": row.get("entrypoint"),
1058 "cwd": row.get("cwd"),
1059 "version": row.get("version"),
1060 "git_branch": row.get("gitBranch"),
1061 "request_id": row.get("requestId"),
1062 "raw_type": row.get("type"),
1063 "raw_record": extract_raw_record(row),
1064 });
1065 options.insert("source".to_owned(), source);
1066 options
1067}
1068
1069fn assistant_options(row: &Value, message_value: &Value, line: usize) -> ProviderOptions {
1070 let mut options = row_options(row, line);
1071 let anthropic = json!({
1072 "id": message_value.get("id"),
1073 "model": message_value.get("model"),
1074 "stop_reason": message_value.get("stop_reason"),
1075 "stop_sequence": message_value.get("stop_sequence"),
1076 "usage": message_value.get("usage"),
1077 });
1078 options.insert("anthropic".to_owned(), anthropic);
1079 options
1080}
1081
1082fn signature_options(value: &Value) -> ProviderOptions {
1083 let mut options = ProviderOptions::new();
1084 if let Some(signature) = value.get("signature").and_then(Value::as_str) {
1085 options.insert("anthropic".to_owned(), json!({"signature": signature}));
1086 }
1087 options
1088}
1089
1090fn attachment_content(value: &Value) -> Option<Extracted<String>> {
1091 extract_str(value, "content").or_else(|| extract_str(value, "stdout"))
1092}
1093
1094fn parse_timestamp(value: &Value) -> anyhow::Result<DateTime<Utc>> {
1095 let timestamp = value
1096 .get("timestamp")
1097 .and_then(Value::as_str)
1098 .context("missing timestamp")?;
1099 Ok(DateTime::parse_from_rfc3339(timestamp)
1100 .context("invalid timestamp")?
1101 .with_timezone(&Utc))
1102}
1103
1104fn is_tool_result(value: &Value) -> bool {
1105 value.get("type").and_then(Value::as_str) == Some("tool_result")
1106}
1107
1108fn is_meta_row(row: &Value) -> bool {
1111 row.get("isMeta").and_then(Value::as_bool) == Some(true)
1112}
1113
1114fn is_injected_user_text(text: &str) -> bool {
1118 let trimmed = text.trim_start();
1119 trimmed.starts_with("<task-notification>")
1120 || trimmed.starts_with("<command-name>")
1121 || trimmed.starts_with("<command-message>")
1122 || trimmed.starts_with("<command-args>")
1123 || trimmed.starts_with("<local-command-caveat>")
1124 || trimmed.starts_with("<local-command-stdout>")
1125 || trimmed.starts_with("[Request interrupted by user")
1126}
1127
1128fn user_text_provenance(row: &Value, text: &str) -> Provenance {
1131 if is_meta_row(row) || is_injected_user_text(text) {
1132 Provenance::Injected
1133 } else {
1134 Provenance::Conversational
1135 }
1136}
1137
1138fn user_array_provenance(row: &Value, items: &[Value]) -> Provenance {
1142 if is_meta_row(row) {
1143 return Provenance::Injected;
1144 }
1145 let wrapped = items.iter().any(|item| {
1146 item.get("type").and_then(Value::as_str) == Some("text")
1147 && item
1148 .get("text")
1149 .and_then(Value::as_str)
1150 .is_some_and(is_injected_user_text)
1151 });
1152 if wrapped {
1153 Provenance::Injected
1154 } else {
1155 Provenance::Conversational
1156 }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161 #![allow(clippy::expect_used, clippy::unwrap_used)]
1169
1170 use super::*;
1171 use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
1172 use tempfile::TempDir;
1173
1174 const FIXTURE_ROOT: &str = concat!(
1177 env!("CARGO_MANIFEST_DIR"),
1178 "/tests/fixtures/adapter/claude_code/projects"
1179 );
1180
1181 #[test]
1182 fn probe_default_finds_claude_projects_under_home() -> anyhow::Result<()> {
1183 crate::adapter::test_support::assert_probe_default(
1184 &ClaudeCodeFactory,
1185 &[".claude", "projects"],
1186 )
1187 }
1188
1189 #[tokio::test(flavor = "multi_thread")]
1190 async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
1191 let adapter = ClaudeCodeAdapter::new(FIXTURE_ROOT);
1192 crate::adapter::test_support::assert_native_restore(
1193 &ClaudeCodeFactory,
1194 &adapter,
1195 std::path::Path::new(FIXTURE_ROOT),
1196 )
1197 .await
1198 }
1199
1200 #[tokio::test(flavor = "multi_thread")]
1208 async fn subagent_file_derives_child_session_with_parent_link() -> anyhow::Result<()> {
1209 let corpus = TempDir::new()?;
1210 let project_dir = corpus.path().join("-tmp-pond-test");
1211 let parent_uuid = "11111111-1111-1111-1111-111111111111";
1212 let agent_hash = "abc123def456";
1213 std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1214
1215 let parent_row = serde_json::json!({
1217 "type": "user",
1218 "uuid": "u-parent-1",
1219 "sessionId": parent_uuid,
1220 "cwd": "/tmp/pond-test",
1221 "timestamp": "2026-05-16T00:00:00.000Z",
1222 "version": "2.1.121",
1223 "message": {"role": "user", "content": "hi parent"},
1224 });
1225 std::fs::write(
1226 project_dir.join(format!("{parent_uuid}.jsonl")),
1227 format!("{parent_row}\n"),
1228 )?;
1229
1230 let subagent_row = serde_json::json!({
1233 "type": "user",
1234 "uuid": "u-sub-1",
1235 "sessionId": parent_uuid,
1236 "cwd": "/tmp/pond-test",
1237 "isSidechain": true,
1238 "agentId": agent_hash,
1239 "timestamp": "2026-05-16T00:01:00.000Z",
1240 "version": "2.1.121",
1241 "message": {"role": "user", "content": "subagent prompt"},
1242 });
1243 std::fs::write(
1244 project_dir
1245 .join(parent_uuid)
1246 .join("subagents")
1247 .join(format!("agent-{agent_hash}.jsonl")),
1248 format!("{subagent_row}\n"),
1249 )?;
1250 std::fs::write(
1251 project_dir
1252 .join(parent_uuid)
1253 .join("subagents")
1254 .join(format!("agent-{agent_hash}.meta.json")),
1255 r#"{"agentType":"general-purpose","description":"do a thing"}"#,
1256 )?;
1257
1258 let store_dir = TempDir::new()?;
1259 let store = Store::open_local(store_dir.path()).await?;
1260 let adapter = ClaudeCodeAdapter::new(corpus.path());
1261
1262 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1263 assert_eq!(
1264 summary.dropped_sessions, 0,
1265 "subagent file must NOT collide with parent (pre-fix this was the project-immutable rejection)"
1266 );
1267
1268 let parent = store
1269 .get_session(parent_uuid)
1270 .await?
1271 .expect("parent session should ingest as the bare uuid");
1272 assert_eq!(parent.session.source_agent, "claude-code");
1273 assert_eq!(parent.session.parent_session_id, None);
1274
1275 let child_id = format!("{parent_uuid}/agent-{agent_hash}");
1276 let child = store
1277 .get_session(&child_id)
1278 .await?
1279 .expect("subagent session must surface under the derived id");
1280 assert_eq!(
1281 child.session.source_agent, "claude-code/general-purpose",
1282 "agent_type from .meta.json should suffix the source_agent label"
1283 );
1284 assert_eq!(
1285 child.session.parent_session_id.as_deref(),
1286 Some(parent_uuid),
1287 "subagent must link back to parent via parent_session_id",
1288 );
1289 let subagent_meta = child
1290 .session
1291 .options
1292 .get("subagent")
1293 .expect("options.subagent must carry the hash + verbatim meta.json");
1294 assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1295 assert_eq!(
1296 subagent_meta["meta"]["agentType"],
1297 serde_json::json!("general-purpose")
1298 );
1299 assert_eq!(
1300 subagent_meta["meta"]["description"],
1301 serde_json::json!("do a thing")
1302 );
1303 Ok(())
1304 }
1305
1306 #[tokio::test(flavor = "multi_thread")]
1310 async fn subagent_without_meta_falls_back_to_generic_label() -> anyhow::Result<()> {
1311 let corpus = TempDir::new()?;
1312 let project_dir = corpus.path().join("-tmp-pond-test");
1313 let parent_uuid = "22222222-2222-2222-2222-222222222222";
1314 let agent_hash = "deadbeef";
1315 std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1316 let row = serde_json::json!({
1317 "type": "user",
1318 "uuid": "u-sub-only",
1319 "sessionId": parent_uuid,
1320 "cwd": "/tmp/pond-test",
1321 "timestamp": "2026-05-16T00:00:00.000Z",
1322 "message": {"role": "user", "content": "no meta sibling here"},
1323 });
1324 std::fs::write(
1325 project_dir
1326 .join(parent_uuid)
1327 .join("subagents")
1328 .join(format!("agent-{agent_hash}.jsonl")),
1329 format!("{row}\n"),
1330 )?;
1331
1332 let store_dir = TempDir::new()?;
1333 let store = Store::open_local(store_dir.path()).await?;
1334 let adapter = ClaudeCodeAdapter::new(corpus.path());
1335 let _summary =
1336 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1337
1338 let child = store
1339 .get_session(&format!("{parent_uuid}/agent-{agent_hash}"))
1340 .await?
1341 .expect("derived child id even without meta");
1342 assert_eq!(child.session.source_agent, "claude-code/subagent");
1343 Ok(())
1344 }
1345
1346 #[tokio::test(flavor = "multi_thread")]
1354 async fn workflow_nested_subagent_derives_distinct_child_not_parent_collision()
1355 -> anyhow::Result<()> {
1356 let corpus = TempDir::new()?;
1357 let project_dir = corpus.path().join("-tmp-pond-test");
1358 let parent_uuid = "44444444-4444-4444-4444-444444444444";
1359 let wf_id = "wf_abcd1234-ef0";
1360 let agent_hash = "cafef00dbaadf00d1";
1361 let wf_dir = project_dir
1362 .join(parent_uuid)
1363 .join("subagents")
1364 .join("workflows")
1365 .join(wf_id);
1366 std::fs::create_dir_all(&wf_dir)?;
1367
1368 let parent_row = serde_json::json!({
1369 "type": "user",
1370 "uuid": "u-parent-1",
1371 "sessionId": parent_uuid,
1372 "cwd": "/tmp/pond-test",
1373 "timestamp": "2026-05-20T00:00:00.000Z",
1374 "message": {"role": "user", "content": "hi parent"},
1375 });
1376 std::fs::write(
1377 project_dir.join(format!("{parent_uuid}.jsonl")),
1378 format!("{parent_row}\n"),
1379 )?;
1380
1381 let subagent_row = serde_json::json!({
1383 "type": "user",
1384 "uuid": "u-wf-sub-1",
1385 "sessionId": parent_uuid,
1386 "cwd": "/tmp/pond-test/packages/sub",
1387 "isSidechain": true,
1388 "agentId": agent_hash,
1389 "timestamp": "2026-05-20T00:01:00.000Z",
1390 "message": {"role": "user", "content": "workflow subagent prompt"},
1391 });
1392 std::fs::write(
1393 wf_dir.join(format!("agent-{agent_hash}.jsonl")),
1394 format!("{subagent_row}\n"),
1395 )?;
1396 std::fs::write(
1397 wf_dir.join(format!("agent-{agent_hash}.meta.json")),
1398 r#"{"agentType":"general-purpose","description":"workflow child"}"#,
1399 )?;
1400
1401 let store_dir = TempDir::new()?;
1402 let store = Store::open_local(store_dir.path()).await?;
1403 let adapter = ClaudeCodeAdapter::new(corpus.path());
1404 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1405 assert_eq!(
1406 summary.dropped_sessions, 0,
1407 "nested workflow subagent must NOT collide with the parent project",
1408 );
1409
1410 let parent = store
1411 .get_session(parent_uuid)
1412 .await?
1413 .expect("parent session ingests under the bare uuid");
1414 assert_eq!(&*parent.session.project, "/tmp/pond-test");
1415 assert_eq!(parent.session.parent_session_id, None);
1416
1417 let child_id = format!("{parent_uuid}/workflows/{wf_id}/agent-{agent_hash}");
1418 let child = store
1419 .get_session(&child_id)
1420 .await?
1421 .expect("workflow subagent surfaces under the full nested child id");
1422 assert_eq!(child.session.source_agent, "claude-code/general-purpose");
1423 assert_eq!(
1424 child.session.parent_session_id.as_deref(),
1425 Some(parent_uuid)
1426 );
1427 assert_eq!(
1428 &*child.session.project, "/tmp/pond-test/packages/sub",
1429 "child keeps its own cwd-derived project, distinct from the parent",
1430 );
1431 let subagent_meta = child
1432 .session
1433 .options
1434 .get("subagent")
1435 .expect("options.subagent present");
1436 assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1437 Ok(())
1438 }
1439
1440 #[tokio::test(flavor = "multi_thread")]
1446 async fn unrecognized_subagents_file_fails_visibly_not_merged() -> anyhow::Result<()> {
1447 let corpus = TempDir::new()?;
1448 let project_dir = corpus.path().join("-tmp-pond-test");
1449 let parent_uuid = "55555555-5555-5555-5555-555555555555";
1450 let unknown_dir = project_dir
1451 .join(parent_uuid)
1452 .join("subagents")
1453 .join("workflows")
1454 .join("wf_future01-aaa");
1455 std::fs::create_dir_all(&unknown_dir)?;
1456
1457 let parent_row = serde_json::json!({
1458 "type": "user",
1459 "uuid": "u-parent-only",
1460 "sessionId": parent_uuid,
1461 "cwd": "/tmp/pond-test",
1462 "timestamp": "2026-05-20T00:00:00.000Z",
1463 "message": {"role": "user", "content": "parent message"},
1464 });
1465 std::fs::write(
1466 project_dir.join(format!("{parent_uuid}.jsonl")),
1467 format!("{parent_row}\n"),
1468 )?;
1469
1470 let unknown_row = serde_json::json!({
1473 "type": "user",
1474 "uuid": "u-should-not-merge",
1475 "sessionId": parent_uuid,
1476 "cwd": "/tmp/pond-test",
1477 "timestamp": "2026-05-20T00:02:00.000Z",
1478 "message": {"role": "user", "content": "must not land under parent"},
1479 });
1480 std::fs::write(
1481 unknown_dir.join("transcript-001.jsonl"),
1482 format!("{unknown_row}\n"),
1483 )?;
1484
1485 let store_dir = TempDir::new()?;
1486 let store = Store::open_local(store_dir.path()).await?;
1487 let adapter = ClaudeCodeAdapter::new(corpus.path());
1488 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1489
1490 assert_eq!(
1491 summary.skipped_files, 1,
1492 "the unrecognized subagents/ transcript must be a visible, counted skip",
1493 );
1494 let parent = store
1495 .get_session(parent_uuid)
1496 .await?
1497 .expect("parent session ingests");
1498 assert_eq!(
1499 parent.messages.len(),
1500 1,
1501 "the unrecognized file's row must NOT be merged into the parent session",
1502 );
1503 assert!(
1504 parent
1505 .messages
1506 .iter()
1507 .all(|m| m.message.id() != "u-should-not-merge"),
1508 "parent must not absorb the unrecognized file's message",
1509 );
1510 Ok(())
1511 }
1512
1513 #[tokio::test(flavor = "multi_thread")]
1521 async fn unrecognized_subagents_file_stays_visible_under_parent_watermark() -> anyhow::Result<()>
1522 {
1523 struct ParentAlreadyFresh;
1524 impl crate::adapter::SkipOracle for ParentAlreadyFresh {
1525 fn last_ingested_at(&self, _session_id: &str) -> Option<DateTime<Utc>> {
1526 Some(
1530 DateTime::parse_from_rfc3339("2999-01-01T00:00:00Z")
1531 .unwrap()
1532 .with_timezone(&Utc),
1533 )
1534 }
1535 fn is_empty(&self) -> bool {
1536 false
1537 }
1538 }
1539
1540 let corpus = TempDir::new()?;
1541 let project_dir = corpus.path().join("-tmp-pond-test");
1542 let parent_uuid = "66666666-6666-6666-6666-666666666666";
1543 let unknown_dir = project_dir
1544 .join(parent_uuid)
1545 .join("subagents")
1546 .join("workflows")
1547 .join("wf_future02-bbb");
1548 std::fs::create_dir_all(&unknown_dir)?;
1549
1550 let parent_row = serde_json::json!({
1551 "type": "user",
1552 "uuid": "u-parent-fresh",
1553 "sessionId": parent_uuid,
1554 "cwd": "/tmp/pond-test",
1555 "timestamp": "2026-05-20T00:00:00.000Z",
1556 "message": {"role": "user", "content": "parent message"},
1557 });
1558 std::fs::write(
1559 project_dir.join(format!("{parent_uuid}.jsonl")),
1560 format!("{parent_row}\n"),
1561 )?;
1562
1563 let unknown_row = serde_json::json!({
1566 "type": "user",
1567 "uuid": "u-resync-should-stay-visible",
1568 "sessionId": parent_uuid,
1569 "cwd": "/tmp/pond-test",
1570 "timestamp": "2026-05-20T00:02:00.000Z",
1571 "message": {"role": "user", "content": "must stay visible"},
1572 });
1573 std::fs::write(
1574 unknown_dir.join("transcript-002.jsonl"),
1575 format!("{unknown_row}\n"),
1576 )?;
1577
1578 let store_dir = TempDir::new()?;
1579 let store = Store::open_local(store_dir.path()).await?;
1580 let adapter = ClaudeCodeAdapter::new(corpus.path());
1581 let summary = ingest_adapter(&store, &adapter, &ParentAlreadyFresh, |_| {}).await?;
1582
1583 assert_eq!(
1584 summary.skipped_files, 1,
1585 "the unrecognized transcript must stay a visible Unsupported skip, not be fresh-skipped under the parent's watermark",
1586 );
1587 assert_eq!(
1590 summary.skipped_fresh, 1,
1591 "only the parent may fresh-skip; the unrecognized file must not borrow its watermark",
1592 );
1593 Ok(())
1594 }
1595
1596 #[tokio::test(flavor = "multi_thread")]
1601 async fn replay_duplicates_are_dedup_at_adapter_layer() -> anyhow::Result<()> {
1602 let corpus = TempDir::new()?;
1603 let project_dir = corpus.path().join("-tmp-pond-test");
1604 std::fs::create_dir_all(&project_dir)?;
1605 let session_uuid = "33333333-3333-3333-3333-333333333333";
1606 let dup_uuid = "u-shared-1";
1607 let row = serde_json::json!({
1608 "type": "user",
1609 "uuid": dup_uuid,
1610 "sessionId": session_uuid,
1611 "cwd": "/tmp/pond-test",
1612 "timestamp": "2026-05-16T00:00:00.000Z",
1613 "message": {"role": "user", "content": "replayed three times"},
1614 });
1615 let body = format!("{row}\n{row}\n{row}\n");
1617 std::fs::write(project_dir.join(format!("{session_uuid}.jsonl")), body)?;
1618
1619 let store_dir = TempDir::new()?;
1620 let store = Store::open_local(store_dir.path()).await?;
1621 let adapter = ClaudeCodeAdapter::new(corpus.path());
1622 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1623
1624 assert_eq!(
1625 summary.dropped_events, 0,
1626 "adapter must dedupe replays before they reach the validator"
1627 );
1628 assert!(
1629 !summary
1630 .drop_reasons
1631 .contains_key(crate::sessions::DROP_REASON_DUPLICATE_MESSAGE_ID),
1632 "duplicate_message_id bucket stays empty when adapter does its job"
1633 );
1634 Ok(())
1635 }
1636
1637 #[tokio::test(flavor = "multi_thread")]
1641 async fn tool_result_name_resolves_from_prior_tool_use_in_same_file() -> anyhow::Result<()> {
1642 let corpus = TempDir::new()?;
1643 let project_dir = corpus.path().join("-tmp-pond-test");
1644 std::fs::create_dir_all(&project_dir)?;
1645 let session_uuid = "44444444-4444-4444-4444-444444444444";
1646 let call_id = "toolu_test_01";
1647
1648 let tool_use_row = serde_json::json!({
1649 "type": "assistant",
1650 "uuid": "u-call",
1651 "sessionId": session_uuid,
1652 "cwd": "/tmp/pond-test",
1653 "timestamp": "2026-05-16T00:00:00.000Z",
1654 "message": {
1655 "role": "assistant",
1656 "content": [{
1657 "type": "tool_use",
1658 "id": call_id,
1659 "name": "Edit",
1660 "input": {"file_path": "/tmp/foo"},
1661 }],
1662 },
1663 });
1664 let tool_result_row = serde_json::json!({
1665 "type": "user",
1666 "uuid": "u-result",
1667 "sessionId": session_uuid,
1668 "cwd": "/tmp/pond-test",
1669 "timestamp": "2026-05-16T00:00:01.000Z",
1670 "message": {
1671 "role": "user",
1672 "content": [{
1673 "type": "tool_result",
1674 "tool_use_id": call_id,
1675 "content": "ok",
1676 }],
1677 },
1678 });
1679 std::fs::write(
1680 project_dir.join(format!("{session_uuid}.jsonl")),
1681 format!("{tool_use_row}\n{tool_result_row}\n"),
1682 )?;
1683
1684 let store_dir = TempDir::new()?;
1685 let store = Store::open_local(store_dir.path()).await?;
1686 let adapter = ClaudeCodeAdapter::new(corpus.path());
1687 let _summary =
1688 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1689 let session = store
1690 .get_session(session_uuid)
1691 .await?
1692 .expect("session ingests");
1693
1694 let mut saw_call = false;
1695 let mut saw_result = false;
1696 for stored in &session.messages {
1697 for part in &stored.parts {
1698 match &part.kind {
1699 PartKind::ToolCall {
1700 call_id: cid, name, ..
1701 } => {
1702 assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1703 assert_eq!(
1704 name.as_ref().map(|e| e.as_str()),
1705 Some("Edit"),
1706 "tool_use carries the name directly"
1707 );
1708 saw_call = true;
1709 }
1710 PartKind::ToolResult {
1711 call_id: cid, name, ..
1712 } => {
1713 assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1714 assert_eq!(
1715 name.as_ref().map(|e| e.as_str()),
1716 Some("Edit"),
1717 "tool_result resolves the name via the per-file map (was 'unknown' pre-2026-05-16)"
1718 );
1719 saw_result = true;
1720 }
1721 _ => {}
1722 }
1723 }
1724 }
1725 assert!(saw_call && saw_result, "both parts must be present");
1726 Ok(())
1727 }
1728
1729 #[test]
1733 fn user_text_provenance_separates_prompts_from_harness_injection() {
1734 let prompt = json!({"type": "user", "uuid": "u1"});
1735 assert_eq!(
1736 user_text_provenance(&prompt, "please refactor the parser"),
1737 Provenance::Conversational,
1738 );
1739
1740 let notification = json!({"type": "user", "uuid": "u2"});
1741 assert_eq!(
1742 user_text_provenance(
1743 ¬ification,
1744 "<task-notification>background task done</task-notification>",
1745 ),
1746 Provenance::Injected,
1747 );
1748
1749 let meta = json!({"type": "user", "uuid": "u3", "isMeta": true});
1750 assert_eq!(
1751 user_text_provenance(&meta, "expanded skill body"),
1752 Provenance::Injected,
1753 );
1754 }
1755
1756 #[tokio::test(flavor = "multi_thread")]
1760 async fn task_notification_message_yields_injected_parts() -> anyhow::Result<()> {
1761 let corpus = TempDir::new()?;
1762 let project_dir = corpus.path().join("-tmp-pond-test");
1763 std::fs::create_dir_all(&project_dir)?;
1764 let session_uuid = "66666666-6666-6666-6666-666666666666";
1765 let prompt = serde_json::json!({
1766 "type": "user",
1767 "uuid": "u-prompt",
1768 "sessionId": session_uuid,
1769 "cwd": "/tmp/pond-test",
1770 "timestamp": "2026-05-16T00:00:00.000Z",
1771 "message": {"role": "user", "content": "genuine human prompt"},
1772 });
1773 let notification = serde_json::json!({
1774 "type": "user",
1775 "uuid": "u-notify",
1776 "sessionId": session_uuid,
1777 "cwd": "/tmp/pond-test",
1778 "timestamp": "2026-05-16T00:00:01.000Z",
1779 "message": {
1780 "role": "user",
1781 "content": "<task-notification>a background task finished</task-notification>",
1782 },
1783 });
1784 std::fs::write(
1785 project_dir.join(format!("{session_uuid}.jsonl")),
1786 format!("{prompt}\n{notification}\n"),
1787 )?;
1788
1789 let store_dir = TempDir::new()?;
1790 let store = Store::open_local(store_dir.path()).await?;
1791 let adapter = ClaudeCodeAdapter::new(corpus.path());
1792 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1793
1794 let session = store
1795 .get_session(session_uuid)
1796 .await?
1797 .expect("session ingests");
1798 let mut saw_prompt = false;
1799 let mut saw_notification = false;
1800 for stored in &session.messages {
1801 for part in &stored.parts {
1802 if stored.message.id() == "u-prompt" {
1803 assert_eq!(part.provenance, crate::wire::Provenance::Conversational);
1804 saw_prompt = true;
1805 }
1806 if stored.message.id() == "u-notify" {
1807 assert_eq!(part.provenance, crate::wire::Provenance::Injected);
1808 saw_notification = true;
1809 }
1810 }
1811 }
1812 assert!(saw_prompt && saw_notification, "both messages present");
1813 Ok(())
1814 }
1815
1816 #[tokio::test(flavor = "multi_thread")]
1820 async fn orphan_tool_result_yields_name_none_not_unknown_sentinel() -> anyhow::Result<()> {
1821 let corpus = TempDir::new()?;
1822 let project_dir = corpus.path().join("-tmp-pond-test");
1823 std::fs::create_dir_all(&project_dir)?;
1824 let session_uuid = "55555555-5555-5555-5555-555555555555";
1825
1826 let row = serde_json::json!({
1828 "type": "user",
1829 "uuid": "u-orphan",
1830 "sessionId": session_uuid,
1831 "cwd": "/tmp/pond-test",
1832 "timestamp": "2026-05-16T00:00:00.000Z",
1833 "message": {
1834 "role": "user",
1835 "content": [{
1836 "type": "tool_result",
1837 "tool_use_id": "toolu_orphan",
1838 "content": "result body, no matching call",
1839 }],
1840 },
1841 });
1842 std::fs::write(
1843 project_dir.join(format!("{session_uuid}.jsonl")),
1844 format!("{row}\n"),
1845 )?;
1846
1847 let store_dir = TempDir::new()?;
1848 let store = Store::open_local(store_dir.path()).await?;
1849 let adapter = ClaudeCodeAdapter::new(corpus.path());
1850 let _summary =
1851 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1852 let session = store
1853 .get_session(session_uuid)
1854 .await?
1855 .expect("session ingests");
1856 let mut found = false;
1857 for stored in &session.messages {
1858 for part in &stored.parts {
1859 if let PartKind::ToolResult { name, call_id, .. } = &part.kind {
1860 assert_eq!(call_id.as_ref().map(|e| e.as_str()), Some("toolu_orphan"));
1861 assert!(
1862 name.is_none(),
1863 "orphan tool_result must be name=None, not synthesized 'unknown'",
1864 );
1865 found = true;
1866 }
1867 }
1868 }
1869 assert!(found, "orphan tool_result part must be present");
1870 Ok(())
1872 }
1873
1874 #[test]
1878 fn workflow_journal_is_a_control_file_not_unsupported() {
1879 let adapter = ClaudeCodeAdapter::new("/tmp/pond-test-root");
1880 let journal = std::path::Path::new(
1881 "/root/-proj/55555555-5555-5555-5555-555555555555/subagents/workflows/wf_030e6487-da6/journal.jsonl",
1882 );
1883 assert!(is_workflow_control_file(journal));
1884 assert!(
1885 adapter.unsupported_reason(journal).is_none(),
1886 "journal.jsonl is a known control file, not an unsupported layout",
1887 );
1888 }
1889
1890 #[test]
1894 fn unknown_subagents_leaf_is_still_unsupported() {
1895 let adapter = ClaudeCodeAdapter::new("/tmp/pond-test-root");
1896 let unknown = std::path::Path::new(
1897 "/root/-proj/PARENT/subagents/workflows/wf_x/transcript-001.jsonl",
1898 );
1899 assert!(
1900 adapter.unsupported_reason(unknown).is_some(),
1901 "an unrecognized non-agent, non-journal leaf must still fail visibly",
1902 );
1903 assert!(!is_workflow_control_file(unknown));
1904
1905 let agent = std::path::Path::new("/root/-proj/PARENT/subagents/agent-abc123def456.jsonl");
1906 assert!(
1907 adapter.unsupported_reason(agent).is_none(),
1908 "a recognized agent transcript is resolvable, not unsupported",
1909 );
1910 }
1911
1912 #[tokio::test(flavor = "multi_thread")]
1917 async fn workflow_journal_skipped_benignly_while_sibling_agent_ingests() -> anyhow::Result<()> {
1918 let corpus = TempDir::new()?;
1919 let project_dir = corpus.path().join("-tmp-pond-test");
1920 let parent_uuid = "77777777-7777-7777-7777-777777777777";
1921 let wf_id = "wf_030e6487-da6";
1922 let agent_hash = "a38f4724ef3864da8";
1923 let wf_dir = project_dir
1924 .join(parent_uuid)
1925 .join("subagents")
1926 .join("workflows")
1927 .join(wf_id);
1928 std::fs::create_dir_all(&wf_dir)?;
1929
1930 let parent_row = serde_json::json!({
1931 "type": "user",
1932 "uuid": "u-parent-1",
1933 "sessionId": parent_uuid,
1934 "cwd": "/tmp/pond-test",
1935 "timestamp": "2026-06-04T00:00:00.000Z",
1936 "message": {"role": "user", "content": "hi parent"},
1937 });
1938 std::fs::write(
1939 project_dir.join(format!("{parent_uuid}.jsonl")),
1940 format!("{parent_row}\n"),
1941 )?;
1942
1943 let agent_row = serde_json::json!({
1944 "type": "user",
1945 "uuid": "u-agent-1",
1946 "sessionId": parent_uuid,
1947 "cwd": "/tmp/pond-test",
1948 "timestamp": "2026-06-04T00:01:00.000Z",
1949 "message": {"role": "user", "content": "workflow agent prompt"},
1950 });
1951 std::fs::write(
1952 wf_dir.join(format!("agent-{agent_hash}.jsonl")),
1953 format!("{agent_row}\n"),
1954 )?;
1955
1956 std::fs::write(
1958 wf_dir.join("journal.jsonl"),
1959 "{\"type\":\"started\",\"key\":\"v2:abc\",\"agentId\":\"a38f\"}\n\
1960 {\"type\":\"result\",\"key\":\"v2:abc\",\"agentId\":\"a38f\",\"result\":{}}\n",
1961 )?;
1962
1963 let store_dir = TempDir::new()?;
1964 let store = Store::open_local(store_dir.path()).await?;
1965 let adapter = ClaudeCodeAdapter::new(corpus.path());
1966 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1967 assert_eq!(
1968 summary.skipped_files, 0,
1969 "journal.jsonl is a control file (benign Empty skip), not an unsupported failure",
1970 );
1971
1972 let child = store
1973 .get_session(&format!(
1974 "{parent_uuid}/workflows/{wf_id}/agent-{agent_hash}"
1975 ))
1976 .await?
1977 .expect("the sibling agent transcript still ingests as a child session");
1978 assert_eq!(
1979 child.session.parent_session_id.as_deref(),
1980 Some(parent_uuid)
1981 );
1982
1983 let parent = store
1984 .get_session(parent_uuid)
1985 .await?
1986 .expect("parent session ingests");
1987 assert_eq!(
1988 parent.messages.len(),
1989 1,
1990 "journal rows must NOT merge into the parent session",
1991 );
1992 Ok(())
1993 }
1994
1995 #[tokio::test(flavor = "multi_thread")]
1999 async fn workflow_journal_with_parent_sessionid_still_not_merged() -> anyhow::Result<()> {
2000 let corpus = TempDir::new()?;
2001 let project_dir = corpus.path().join("-tmp-pond-test");
2002 let parent_uuid = "88888888-8888-8888-8888-888888888888";
2003 let wf_dir = project_dir
2004 .join(parent_uuid)
2005 .join("subagents")
2006 .join("workflows")
2007 .join("wf_abc01234-def");
2008 std::fs::create_dir_all(&wf_dir)?;
2009
2010 let parent_row = serde_json::json!({
2011 "type": "user",
2012 "uuid": "u-parent",
2013 "sessionId": parent_uuid,
2014 "cwd": "/tmp/pond-test",
2015 "timestamp": "2026-06-04T00:00:00.000Z",
2016 "message": {"role": "user", "content": "parent only"},
2017 });
2018 std::fs::write(
2019 project_dir.join(format!("{parent_uuid}.jsonl")),
2020 format!("{parent_row}\n"),
2021 )?;
2022
2023 let journal_row = serde_json::json!({
2026 "type": "started",
2027 "key": "v2:abc",
2028 "agentId": "a1",
2029 "sessionId": parent_uuid,
2030 "message": {"role": "user", "content": "must not merge"},
2031 });
2032 std::fs::write(wf_dir.join("journal.jsonl"), format!("{journal_row}\n"))?;
2033
2034 let store_dir = TempDir::new()?;
2035 let store = Store::open_local(store_dir.path()).await?;
2036 let adapter = ClaudeCodeAdapter::new(corpus.path());
2037 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
2038 assert_eq!(
2039 summary.skipped_files, 0,
2040 "journal is a benign Empty skip, not an unsupported failure",
2041 );
2042 let parent = store
2043 .get_session(parent_uuid)
2044 .await?
2045 .expect("parent session ingests");
2046 assert_eq!(
2047 parent.messages.len(),
2048 1,
2049 "journal row must NOT merge even when it carries the parent sessionId",
2050 );
2051 Ok(())
2052 }
2053}