1use std::{
10 collections::{HashMap, HashSet},
11 hash::{Hash, Hasher},
12 path::{Path, PathBuf},
13};
14
15use anyhow::Context as _;
16use chrono::{DateTime, SecondsFormat, Utc};
17use serde_json::{Value, json};
18
19use crate::{
20 sessions::IngestEvent,
21 wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
22};
23
24use super::{
25 Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
26 RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
27 empty_options,
28 extract::{
29 Extracted, Source, extract_compact_repr, extract_raw_record, extract_self_str, extract_str,
30 },
31 extracted_text,
32 jsonl::{
33 BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events, peek_last_mapped,
34 source_line,
35 },
36 jsonl_bytes, part_id, part_ordinal, raw_record,
37};
38
39#[derive(Debug, Default)]
61pub(crate) struct FileState {
62 seen_records: HashSet<(String, u64)>,
63 tool_call_names: HashMap<String, Extracted<String>>,
64}
65
66const NAME: &str = "claude-code";
70
71pub struct ClaudeCodeFactory;
74
75impl AdapterFactory for ClaudeCodeFactory {
76 fn name(&self) -> &'static str {
77 NAME
78 }
79
80 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
81 Ok(Box::new(ClaudeCodeAdapter::new(config_path(NAME, config)?)))
82 }
83
84 fn probe_default(&self, env: &Env) -> Option<Value> {
85 let path = env.home.join(".claude").join("projects");
86 path.exists().then(|| json!({ "path": path }))
87 }
88
89 fn serialize(
90 &self,
91 session: &crate::sessions::SessionWithMessages,
92 fidelity: RestoreFidelity,
93 ) -> Result<Vec<RestoredFile>, AdapterError> {
94 serialize_session(session, fidelity)
95 }
96}
97
98fn serialize_session(
99 session: &crate::sessions::SessionWithMessages,
100 fidelity: RestoreFidelity,
101) -> Result<Vec<RestoredFile>, AdapterError> {
102 let mut messages = session.messages.clone();
103 if fidelity == RestoreFidelity::Native {
104 messages.sort_by(|left, right| {
105 source_line(left.message.options())
106 .cmp(&source_line(right.message.options()))
107 .then_with(|| by_timestamp_then_id(left, right))
108 });
109 } else {
110 messages.sort_by(by_timestamp_then_id);
111 }
112 let mut records = Vec::with_capacity(messages.len());
116 let mut parent_uuid = None::<String>;
117 for message in &messages {
118 if fidelity == RestoreFidelity::Native
119 && let Some(raw) = raw_record(message.message.options())
120 {
121 parent_uuid = raw
122 .get("uuid")
123 .and_then(Value::as_str)
124 .map(ToOwned::to_owned)
125 .or(parent_uuid);
126 records.push(raw);
127 continue;
128 }
129 let Some(record) = claude_record(session, message, parent_uuid.as_deref()) else {
132 continue;
133 };
134 parent_uuid = record
135 .get("uuid")
136 .and_then(Value::as_str)
137 .map(ToOwned::to_owned);
138 records.push(record);
139 }
140
141 let mut files = vec![RestoredFile::new(
142 claude_relative_path(session),
143 jsonl_bytes(NAME, &records)?,
144 fidelity,
145 )];
146 if session.session.parent_session_id.is_some()
147 && let Some(meta) = subagent_meta_record(session)
148 {
149 let mut meta_path = files[0].relative_path.clone();
150 meta_path.set_extension("meta.json");
151 files.push(RestoredFile::new(
152 meta_path,
153 serde_json::to_vec(&meta).map_err(|err| {
154 AdapterError::schema(
155 NAME,
156 &session.session.id,
157 format!("json encode failed: {err}"),
158 )
159 })?,
160 fidelity,
161 ));
162 }
163 Ok(files)
164}
165
166fn claude_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
167 let encoded_project = session
168 .session
169 .options
170 .get("source")
171 .and_then(|source| source.get("project_dir"))
172 .and_then(Value::as_str)
173 .map(ToOwned::to_owned)
174 .unwrap_or_else(|| encode_project(&session.session.project));
175 if let Some(parent) = &session.session.parent_session_id {
176 let child_suffix = session
181 .session
182 .id
183 .strip_prefix(&format!("{parent}/"))
184 .unwrap_or(&session.session.id);
185 return PathBuf::from(encoded_project)
186 .join(parent)
187 .join("subagents")
188 .join(format!("{child_suffix}.jsonl"));
189 }
190 PathBuf::from(encoded_project).join(format!("{}.jsonl", session.session.id))
191}
192
193fn encode_project(project: &str) -> String {
194 project.replace(['/', '.'], "-")
195}
196
197fn subagent_meta_record(session: &crate::sessions::SessionWithMessages) -> Option<Value> {
198 let meta = session.session.options.get("subagent")?.get("meta")?;
202 meta.is_object().then(|| meta.clone())
203}
204
205fn claude_record(
206 session: &crate::sessions::SessionWithMessages,
207 message: &crate::sessions::MessageWithParts,
208 parent_uuid: Option<&str>,
209) -> Option<Value> {
210 let row_role = match &message.message {
218 Message::System { .. } => return None,
219 Message::User { .. } | Message::Tool { .. } => "user",
220 Message::Assistant { .. } => "assistant",
221 };
222 let mut envelope = serde_json::Map::new();
223 envelope.insert("role".to_owned(), Value::String(row_role.to_owned()));
224 if row_role == "assistant" {
225 envelope.insert("type".to_owned(), Value::String("message".to_owned()));
228 }
229 envelope.insert(
230 "content".to_owned(),
231 Value::Array(message.parts.iter().map(claude_part).collect()),
232 );
233 Some(json!({
234 "parentUuid": parent_uuid,
235 "isSidechain": false,
236 "userType": "external",
237 "cwd": &*session.session.project,
238 "sessionId": &session.session.id,
239 "type": row_role,
240 "message": Value::Object(envelope),
241 "uuid": message.message.id(),
242 "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
243 }))
244}
245
246fn claude_part(part: &Part) -> Value {
247 match &part.kind {
248 PartKind::Text { text } => json!({"type": "text", "text": extracted_text(text)}),
249 PartKind::Reasoning { text } => {
250 json!({"type": "thinking", "thinking": extracted_text(text)})
251 }
252 PartKind::ToolCall {
253 call_id,
254 name,
255 params,
256 provider_executed,
257 } => json!({
258 "type": if *provider_executed { "server_tool_use" } else { "tool_use" },
259 "id": extracted_text(call_id),
260 "name": extracted_text(name),
261 "input": params,
262 }),
263 PartKind::ToolResult {
264 call_id,
265 is_failure,
266 result,
267 ..
268 } => json!({
269 "type": "tool_result",
270 "tool_use_id": extracted_text(call_id),
271 "is_error": is_failure,
272 "content": result,
273 }),
274 PartKind::File {
275 media_type,
276 file_name,
277 data,
278 } => json!({
279 "type": "file",
280 "media_type": media_type,
281 "file_name": file_name,
282 "source": file_source(data),
283 }),
284 other => {
285 json!({"type": "text", "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null))})
286 }
287 }
288}
289
290fn file_source(data: &FileData) -> Value {
291 match data {
292 FileData::String(value) => json!({"type": "text", "data": value}),
293 FileData::Bytes(value) => json!({"type": "base64", "data": value}),
294 FileData::Url(value) => json!({"type": "url", "url": value}),
295 }
296}
297
298#[derive(Debug, Clone)]
301pub struct ClaudeCodeAdapter {
302 root: PathBuf,
303}
304
305impl ClaudeCodeAdapter {
306 pub fn new(root: impl Into<PathBuf>) -> Self {
307 Self { root: root.into() }
308 }
309}
310
311impl Adapter for ClaudeCodeAdapter {
312 fn discover(&self) -> DiscoverFuture<'_> {
313 jsonl_tree_discover(self)
314 }
315
316 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
317 jsonl_tree_events(self, oracle)
318 }
319}
320
321impl JsonlTree for ClaudeCodeAdapter {
322 type State = FileState;
323
324 fn name(&self) -> &'static str {
325 NAME
326 }
327
328 fn root(&self) -> &Path {
329 &self.root
330 }
331
332 fn peek_session_id(&self, path: &Path, first_line: &str) -> Option<String> {
333 if subagents_dir(path).is_some() {
340 let (parent_uuid, child_suffix, _) = subagent_ids(path)?;
341 return Some(format!("{parent_uuid}/{child_suffix}"));
342 }
343 let row: Value = serde_json::from_str(first_line).ok()?;
344 row.get("sessionId")?.as_str().map(ToOwned::to_owned)
345 }
346
347 fn peek_last_ts(&self, path: &Path) -> Option<i64> {
348 peek_last_mapped(path, |line| {
355 let row: Value = serde_json::from_str(line).ok()?;
356 Some(parse_timestamp(&row).ok()?.timestamp_micros())
357 })
358 }
359
360 fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
361 session_from_rows(path, rows)
362 }
363
364 fn events_from_row(
365 &self,
366 session: &Session,
367 row: &BoundedRow,
368 state: &mut Self::State,
369 ) -> Result<Vec<IngestEvent>, String> {
370 if let Some(uuid) = row.value.get("uuid").and_then(Value::as_str)
371 && !state
372 .seen_records
373 .insert((uuid.to_owned(), source_record_hash(&row.value)))
374 {
375 return Ok(Vec::new());
376 }
377 capture_tool_call_names(&row.value, &mut state.tool_call_names);
378 events_from_row(&session.id, row.line, &row.value, session.created_at, state)
379 }
380
381 fn unsupported_reason(&self, path: &Path) -> Option<String> {
382 if subagents_dir(path).is_some()
391 && subagent_ids(path).is_none()
392 && !is_workflow_control_file(path)
393 {
394 return Some(format!(
395 "{}: subagent transcript layout not recognized by this pond version; \
396 skipped so it is not merged into the parent session - update pond and \
397 re-run `pond sync`",
398 path.display()
399 ));
400 }
401 None
402 }
403}
404
405fn source_record_hash(value: &Value) -> u64 {
409 let mut hasher = std::collections::hash_map::DefaultHasher::new();
410 let pick = |path: &[&str]| -> &Value {
411 let mut cur = value;
412 for key in path {
413 match cur.get(*key) {
414 Some(next) => cur = next,
415 None => return &Value::Null,
416 }
417 }
418 cur
419 };
420 for path in [
421 &["type"][..],
422 &["parentUuid"][..],
423 &["message", "role"][..],
424 &["message", "content"][..],
425 &["toolUseResult"][..],
426 ] {
427 compact_json(pick(path)).hash(&mut hasher);
428 }
429 hasher.finish()
430}
431
432fn is_workflow_control_file(path: &Path) -> bool {
438 subagents_dir(path).is_some()
439 && path.file_name().and_then(|n| n.to_str()) == Some("journal.jsonl")
440}
441
442fn capture_tool_call_names(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
447 let Some(items) = row
448 .get("message")
449 .and_then(|message| message.get("content"))
450 .and_then(Value::as_array)
451 else {
452 return;
453 };
454 for item in items {
455 let kind = item.get("type").and_then(Value::as_str);
456 if !matches!(kind, Some("tool_use") | Some("server_tool_use")) {
457 continue;
458 }
459 let (Some(id), Some(name)) = (item.str_field("id"), extract_str(item, "name")) else {
460 continue;
461 };
462 map.insert(id.to_owned(), name);
463 }
464}
465
466fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
467 let path_display = path.display().to_string();
468 if subagents_dir(path).is_some() && subagent_ids(path).is_none() {
473 return Err(AdapterError::schema(
474 NAME,
475 path_display,
476 "sidecar/control file under subagents/ has no session of its own",
477 ));
478 }
479 let mut created_at = None;
480 let mut project: Option<Extracted<String>> = None;
481 let mut version = None;
482 for row in rows {
483 if created_at.is_none() {
484 created_at = parse_timestamp(&row.value).ok();
485 }
486 if project.is_none() {
487 project = extract_str(&row.value, "cwd");
488 }
489 if version.is_none() {
490 version = row
491 .value
492 .get("version")
493 .and_then(Value::as_str)
494 .map(ToOwned::to_owned);
495 }
496 }
497
498 let first = rows
499 .first()
500 .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
501 let at_first = format!("{path_display}:{}", first.line);
502 let raw_session_id = first
503 .value
504 .get("sessionId")
505 .and_then(Value::as_str)
506 .ok_or_else(|| {
507 AdapterError::schema(
508 NAME,
509 at_first.clone(),
510 format!("line {} missing sessionId", first.line),
511 )
512 })?
513 .to_owned();
514 let created_at = created_at.ok_or_else(|| {
515 AdapterError::schema(NAME, at_first, "session has no parseable timestamp")
516 })?;
517
518 let subagent = subagent_descriptor(path);
531 let project_dir = source_project_dir(path, subagent.is_some());
532 let (session_id, parent_session_id, source_agent, subagent_options) = match subagent {
533 Some(SubagentDescriptor {
534 parent_uuid,
535 child_suffix,
536 agent_hash,
537 agent_type,
538 meta,
539 }) => {
540 let child_id = format!("{parent_uuid}/{child_suffix}");
541 let agent_label = agent_type
542 .as_deref()
543 .map(|t| format!("claude-code/{t}"))
544 .unwrap_or_else(|| "claude-code/subagent".to_owned());
545 let metadata = json!({
549 "hash": agent_hash,
550 "raw_session_id": raw_session_id,
551 "meta": meta,
552 });
553 (child_id, Some(parent_uuid), agent_label, Some(metadata))
554 }
555 None => (raw_session_id, None, "claude-code".to_owned(), None),
556 };
557
558 let project = match project {
559 Some(value) => value,
560 None => {
561 let decoded = path
562 .parent()
563 .and_then(|p| p.file_name())
564 .and_then(|n| n.to_str())
565 .map(|s| s.replace('-', "/"))
566 .ok_or_else(|| {
567 AdapterError::schema(
568 NAME,
569 path_display.clone(),
570 "no `cwd` field in any row and source path is not UTF-8",
571 )
572 })?;
573 extract_self_str(&Value::String(decoded)).ok_or_else(|| {
574 AdapterError::schema(
575 NAME,
576 path_display.clone(),
577 "internal: Value::String produced None from Source::as_str",
578 )
579 })?
580 }
581 };
582
583 let mut options = ProviderOptions::new();
584 options.insert(
585 "source".to_owned(),
586 json!({
587 "adapter": "claude-code",
588 "version": version,
589 "project_dir": project_dir,
590 "workspace_path": &*project,
591 }),
592 );
593 if let Some(metadata) = subagent_options {
594 options.insert("subagent".to_owned(), metadata);
595 }
596
597 Ok(Session {
598 id: session_id,
599 parent_session_id,
600 parent_message_id: None,
601 source_agent,
602 created_at,
603 project,
604 options,
605 })
606}
607
608fn source_project_dir(path: &Path, is_subagent: bool) -> Option<String> {
609 let project_dir = if is_subagent {
614 subagents_dir(path)?.parent()?.parent()
615 } else {
616 path.parent()
617 };
618 project_dir
619 .and_then(|p| p.file_name())
620 .and_then(|n| n.to_str())
621 .map(ToOwned::to_owned)
622}
623
624fn subagents_dir(path: &Path) -> Option<&Path> {
629 let mut cur = path.parent();
630 while let Some(dir) = cur {
631 if dir.file_name().and_then(|n| n.to_str()) == Some("subagents") {
632 return Some(dir);
633 }
634 cur = dir.parent();
635 }
636 None
637}
638
639struct SubagentDescriptor {
645 parent_uuid: String,
646 child_suffix: String,
647 agent_hash: String,
648 agent_type: Option<String>,
649 meta: Option<Value>,
650}
651
652fn subagent_ids(path: &Path) -> Option<(String, String, String)> {
660 let file_name = path.file_name()?.to_str()?;
661 let agent_hash = file_name
662 .strip_prefix("agent-")?
663 .strip_suffix(".jsonl")?
664 .to_owned();
665 let subagents = subagents_dir(path)?;
666 let parent_uuid = subagents.parent()?.file_name()?.to_str()?.to_owned();
667 let child_suffix = path
671 .strip_prefix(subagents)
672 .ok()?
673 .with_extension("")
674 .to_str()?
675 .replace(std::path::MAIN_SEPARATOR, "/");
676 Some((parent_uuid, child_suffix, agent_hash))
677}
678
679fn subagent_descriptor(path: &Path) -> Option<SubagentDescriptor> {
682 let (parent_uuid, child_suffix, agent_hash) = subagent_ids(path)?;
683 let meta_path = path.parent()?.join(format!("agent-{agent_hash}.meta.json"));
684 let (agent_type, meta) = match std::fs::read(&meta_path) {
685 Ok(bytes) => match serde_json::from_slice::<Value>(&bytes) {
686 Ok(value) => (
687 value
688 .get("agentType")
689 .and_then(Value::as_str)
690 .map(ToOwned::to_owned),
691 Some(value),
692 ),
693 Err(error) => {
694 tracing::debug!(
695 target: "pond::adapter::claude_code",
696 meta = %meta_path.display(),
697 %error,
698 "subagent .meta.json present but unparseable; falling back to 'claude-code/subagent'",
699 );
700 (None, None)
701 }
702 },
703 Err(error) if error.kind() == std::io::ErrorKind::NotFound => (None, None),
704 Err(error) => {
705 tracing::debug!(
706 target: "pond::adapter::claude_code",
707 meta = %meta_path.display(),
708 %error,
709 "subagent .meta.json IO error; falling back to 'claude-code/subagent'",
710 );
711 (None, None)
712 }
713 };
714
715 Some(SubagentDescriptor {
716 parent_uuid,
717 child_suffix,
718 agent_hash,
719 agent_type,
720 meta,
721 })
722}
723
724fn events_from_row(
725 session_id: &str,
726 line: usize,
727 row: &Value,
728 default_timestamp: DateTime<Utc>,
729 state: &FileState,
730) -> Result<Vec<IngestEvent>, String> {
731 let timestamp = parse_timestamp(row).unwrap_or(default_timestamp);
732 let uuid = row
733 .get("uuid")
734 .and_then(Value::as_str)
735 .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
736
737 if let Some(message_value) = row.get("message") {
738 return message_events(
739 session_id,
740 &uuid,
741 timestamp,
742 row,
743 message_value,
744 state,
745 line,
746 );
747 }
748
749 let raw_type = row.get("type").and_then(Value::as_str);
756 let content = if raw_type == Some("attachment") {
757 row.get("attachment")
758 .and_then(attachment_content)
759 .or_else(|| Some(extract_compact_repr(row)))
760 } else {
761 extract_str(row, "subtype").or_else(|| extract_str(row, "type"))
762 };
763 let message = Message::System {
764 id: uuid,
765 session_id: session_id.to_owned(),
766 timestamp,
767 content,
768 options: row_options(row, line),
769 };
770 Ok(vec![IngestEvent::Message(message)])
771}
772
773fn message_events(
774 session_id: &str,
775 uuid: &str,
776 timestamp: DateTime<Utc>,
777 row: &Value,
778 message_value: &Value,
779 state: &FileState,
780 line: usize,
781) -> Result<Vec<IngestEvent>, String> {
782 let role = message_value
783 .get("role")
784 .and_then(Value::as_str)
785 .ok_or_else(|| "message missing role".to_owned())?;
786 let content = message_value.get("content").unwrap_or(&Value::Null);
787 let mut parts = Vec::new();
788 let message = match (role, content) {
789 ("user", Value::String(text)) => {
790 let provenance = user_text_provenance(row, text);
794 parts.push(text_part(
795 session_id,
796 uuid,
797 0,
798 extract_self_str(content),
799 provenance,
800 ));
801 Message::User {
802 id: uuid.to_owned(),
803 session_id: session_id.to_owned(),
804 timestamp,
805 options: row_options(row, line),
806 }
807 }
808 ("user", Value::Array(items)) if items.iter().all(is_tool_result) => {
809 let source_tool_result = row.get("toolUseResult").cloned();
810 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
811 tool_result_part(
812 session_id,
813 uuid,
814 ordinal,
815 item,
816 source_tool_result.as_ref(),
817 state,
818 )
819 }));
820 Message::Tool {
821 id: uuid.to_owned(),
822 session_id: session_id.to_owned(),
823 timestamp,
824 options: row_options(row, line),
825 }
826 }
827 ("user", Value::Array(items)) => {
828 let provenance = user_array_provenance(row, items);
831 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
832 user_part(session_id, uuid, ordinal, item, state, provenance)
833 }));
834 Message::User {
835 id: uuid.to_owned(),
836 session_id: session_id.to_owned(),
837 timestamp,
838 options: row_options(row, line),
839 }
840 }
841 ("assistant", Value::Array(items)) => {
842 parts.extend(
843 items
844 .iter()
845 .enumerate()
846 .map(|(ordinal, item)| assistant_part(session_id, uuid, ordinal, item)),
847 );
848 Message::Assistant {
849 id: uuid.to_owned(),
850 session_id: session_id.to_owned(),
851 timestamp,
852 options: assistant_options(row, message_value, line),
853 }
854 }
855 ("system", Value::String(_)) => Message::System {
856 id: uuid.to_owned(),
857 session_id: session_id.to_owned(),
858 timestamp,
859 content: extract_self_str(content),
860 options: row_options(row, line),
861 },
862 ("system", _) => Message::System {
863 id: uuid.to_owned(),
864 session_id: session_id.to_owned(),
865 timestamp,
866 content: Some(extract_compact_repr(message_value)),
871 options: row_options(row, line),
872 },
873 _ => Message::System {
877 id: uuid.to_owned(),
878 session_id: session_id.to_owned(),
879 timestamp,
880 content: Some(extract_compact_repr(message_value)),
881 options: row_options(row, line),
882 },
883 };
884
885 let mut events = Vec::with_capacity(parts.len() + 1);
886 events.push(IngestEvent::Message(message));
887 events.extend(parts.into_iter().map(IngestEvent::Part));
888 Ok(events)
889}
890
891fn text_part(
892 session_id: &str,
893 message_id: &str,
894 ordinal: usize,
895 text: Option<Extracted<String>>,
896 provenance: Provenance,
897) -> Part {
898 Part {
899 session_id: session_id.to_owned(),
900 id: part_id(message_id, ordinal),
901 message_id: message_id.to_owned(),
902 ordinal: part_ordinal(ordinal),
903 provenance,
904 options: empty_options(),
905 kind: PartKind::Text { text },
906 }
907}
908
909fn user_part(
910 session_id: &str,
911 message_id: &str,
912 ordinal: usize,
913 value: &Value,
914 state: &FileState,
915 provenance: Provenance,
916) -> Part {
917 match value.get("type").and_then(Value::as_str) {
918 Some("text") => text_part(
919 session_id,
920 message_id,
921 ordinal,
922 extract_str(value, "text"),
923 provenance,
924 ),
925 Some("image") | Some("file") => {
926 file_part(session_id, message_id, ordinal, value, provenance)
927 }
928 Some("tool_result") => {
929 tool_result_part(session_id, message_id, ordinal, value, None, state)
930 }
931 _ => text_part(
935 session_id,
936 message_id,
937 ordinal,
938 Some(extract_compact_repr(value)),
939 provenance,
940 ),
941 }
942}
943
944fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, value: &Value) -> Part {
945 match value.get("type").and_then(Value::as_str) {
949 Some("text") => text_part(
950 session_id,
951 message_id,
952 ordinal,
953 extract_str(value, "text"),
954 Provenance::Conversational,
955 ),
956 Some("thinking") => Part {
957 session_id: session_id.to_owned(),
958 id: part_id(message_id, ordinal),
959 message_id: message_id.to_owned(),
960 ordinal: part_ordinal(ordinal),
961 provenance: Provenance::Conversational,
962 options: signature_options(value),
963 kind: PartKind::Reasoning {
964 text: extract_str(value, "thinking"),
965 },
966 },
967 Some("tool_use") => Part {
968 session_id: session_id.to_owned(),
969 id: part_id(message_id, ordinal),
970 message_id: message_id.to_owned(),
971 ordinal: part_ordinal(ordinal),
972 provenance: Provenance::Conversational,
973 options: empty_options(),
974 kind: PartKind::ToolCall {
975 call_id: extract_str(value, "id"),
976 name: extract_str(value, "name"),
977 params: value.get("input").cloned().unwrap_or(Value::Null),
978 provider_executed: false,
979 },
980 },
981 Some("server_tool_use") => Part {
982 session_id: session_id.to_owned(),
983 id: part_id(message_id, ordinal),
984 message_id: message_id.to_owned(),
985 ordinal: part_ordinal(ordinal),
986 provenance: Provenance::Conversational,
987 options: empty_options(),
988 kind: PartKind::ToolCall {
989 call_id: extract_str(value, "id"),
990 name: extract_str(value, "name"),
991 params: value.get("input").cloned().unwrap_or(Value::Null),
992 provider_executed: true,
993 },
994 },
995 Some("image") | Some("file") => file_part(
996 session_id,
997 message_id,
998 ordinal,
999 value,
1000 Provenance::Conversational,
1001 ),
1002 _ => text_part(
1005 session_id,
1006 message_id,
1007 ordinal,
1008 Some(extract_compact_repr(value)),
1009 Provenance::Conversational,
1010 ),
1011 }
1012}
1013
1014fn tool_result_part(
1015 session_id: &str,
1016 message_id: &str,
1017 ordinal: usize,
1018 value: &Value,
1019 source_tool_result: Option<&Value>,
1020 state: &FileState,
1021) -> Part {
1022 let call_id = extract_str(value, "tool_use_id");
1023 let name = value
1029 .str_field("tool_use_id")
1030 .and_then(|id| state.tool_call_names.get(id))
1031 .cloned();
1032 let result = value
1033 .get("content")
1034 .cloned()
1035 .or_else(|| source_tool_result.cloned())
1036 .unwrap_or(Value::Null);
1037 Part {
1038 session_id: session_id.to_owned(),
1039 id: part_id(message_id, ordinal),
1040 message_id: message_id.to_owned(),
1041 ordinal: part_ordinal(ordinal),
1042 provenance: Provenance::Injected,
1045 options: empty_options(),
1046 kind: PartKind::ToolResult {
1047 call_id,
1048 name,
1049 is_failure: value
1050 .get("is_error")
1051 .and_then(Value::as_bool)
1052 .unwrap_or(false),
1053 result,
1054 },
1055 }
1056}
1057
1058fn file_part(
1059 session_id: &str,
1060 message_id: &str,
1061 ordinal: usize,
1062 value: &Value,
1063 provenance: Provenance,
1064) -> Part {
1065 let media_type = value
1066 .get("media_type")
1067 .or_else(|| value.get("mime_type"))
1068 .and_then(Value::as_str)
1069 .map(ToOwned::to_owned);
1070 let file_name = value
1071 .get("file_name")
1072 .or_else(|| value.get("name"))
1073 .and_then(Value::as_str)
1074 .map(ToOwned::to_owned);
1075 let data = if let Some(source) = value.get("source") {
1076 if let Some(url) = source.get("url").and_then(Value::as_str) {
1077 FileData::Url(url.to_owned())
1078 } else if let Some(bytes) = source.get("data").and_then(Value::as_str) {
1079 FileData::String(bytes.to_owned())
1080 } else {
1081 FileData::String(compact_json(source))
1082 }
1083 } else if let Some(url) = value.get("url").and_then(Value::as_str) {
1084 FileData::Url(url.to_owned())
1085 } else {
1086 FileData::String(compact_json(value))
1087 };
1088
1089 Part {
1090 session_id: session_id.to_owned(),
1091 id: part_id(message_id, ordinal),
1092 message_id: message_id.to_owned(),
1093 ordinal: part_ordinal(ordinal),
1094 provenance,
1095 options: empty_options(),
1096 kind: PartKind::File {
1097 media_type,
1098 file_name,
1099 data,
1100 },
1101 }
1102}
1103
1104fn row_options(row: &Value, line: usize) -> ProviderOptions {
1105 let mut options = ProviderOptions::new();
1106 let source = json!({
1107 "line": line,
1108 "parent_uuid": row.get("parentUuid"),
1109 "is_sidechain": row.get("isSidechain"),
1110 "user_type": row.get("userType"),
1111 "entrypoint": row.get("entrypoint"),
1112 "cwd": row.get("cwd"),
1113 "version": row.get("version"),
1114 "git_branch": row.get("gitBranch"),
1115 "request_id": row.get("requestId"),
1116 "raw_type": row.get("type"),
1117 "raw_record": extract_raw_record(row),
1118 });
1119 options.insert("source".to_owned(), source);
1120 options
1121}
1122
1123fn assistant_options(row: &Value, message_value: &Value, line: usize) -> ProviderOptions {
1124 let mut options = row_options(row, line);
1125 let anthropic = json!({
1126 "id": message_value.get("id"),
1127 "model": message_value.get("model"),
1128 "stop_reason": message_value.get("stop_reason"),
1129 "stop_sequence": message_value.get("stop_sequence"),
1130 "usage": message_value.get("usage"),
1131 });
1132 options.insert("anthropic".to_owned(), anthropic);
1133 options
1134}
1135
1136fn signature_options(value: &Value) -> ProviderOptions {
1137 let mut options = ProviderOptions::new();
1138 if let Some(signature) = value.get("signature").and_then(Value::as_str) {
1139 options.insert("anthropic".to_owned(), json!({"signature": signature}));
1140 }
1141 options
1142}
1143
1144fn attachment_content(value: &Value) -> Option<Extracted<String>> {
1145 extract_str(value, "content").or_else(|| extract_str(value, "stdout"))
1146}
1147
1148fn parse_timestamp(value: &Value) -> anyhow::Result<DateTime<Utc>> {
1149 let timestamp = value
1150 .get("timestamp")
1151 .and_then(Value::as_str)
1152 .context("missing timestamp")?;
1153 Ok(DateTime::parse_from_rfc3339(timestamp)
1154 .context("invalid timestamp")?
1155 .with_timezone(&Utc))
1156}
1157
1158fn is_tool_result(value: &Value) -> bool {
1159 value.get("type").and_then(Value::as_str) == Some("tool_result")
1160}
1161
1162fn is_meta_row(row: &Value) -> bool {
1165 row.get("isMeta").and_then(Value::as_bool) == Some(true)
1166}
1167
1168fn is_injected_user_text(text: &str) -> bool {
1172 let trimmed = text.trim_start();
1173 trimmed.starts_with("<task-notification>")
1174 || trimmed.starts_with("<command-name>")
1175 || trimmed.starts_with("<command-message>")
1176 || trimmed.starts_with("<command-args>")
1177 || trimmed.starts_with("<local-command-caveat>")
1178 || trimmed.starts_with("<local-command-stdout>")
1179 || trimmed.starts_with("[Request interrupted by user")
1180}
1181
1182fn user_text_provenance(row: &Value, text: &str) -> Provenance {
1185 if is_meta_row(row) || is_injected_user_text(text) {
1186 Provenance::Injected
1187 } else {
1188 Provenance::Conversational
1189 }
1190}
1191
1192fn user_array_provenance(row: &Value, items: &[Value]) -> Provenance {
1196 if is_meta_row(row) {
1197 return Provenance::Injected;
1198 }
1199 let wrapped = items.iter().any(|item| {
1200 item.get("type").and_then(Value::as_str) == Some("text")
1201 && item
1202 .get("text")
1203 .and_then(Value::as_str)
1204 .is_some_and(is_injected_user_text)
1205 });
1206 if wrapped {
1207 Provenance::Injected
1208 } else {
1209 Provenance::Conversational
1210 }
1211}
1212
1213#[cfg(test)]
1214mod tests {
1215 #![allow(clippy::expect_used, clippy::unwrap_used)]
1223
1224 use super::*;
1225 use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
1226 use tempfile::TempDir;
1227
1228 const FIXTURE_ROOT: &str = concat!(
1231 env!("CARGO_MANIFEST_DIR"),
1232 "/tests/fixtures/adapter/claude_code/projects"
1233 );
1234
1235 #[test]
1236 fn probe_default_finds_claude_projects_under_home() -> anyhow::Result<()> {
1237 crate::adapter::test_support::assert_probe_default(
1238 &ClaudeCodeFactory,
1239 &[".claude", "projects"],
1240 )
1241 }
1242
1243 #[test]
1248 fn source_record_hash_ignores_noise_keeps_semantic_diffs() {
1249 let base = serde_json::json!({
1250 "uuid": "u1",
1251 "type": "user",
1252 "parentUuid": null,
1253 "message": {"role": "user", "content": "hi"},
1254 "timestamp": "2026-06-17T00:00:00Z",
1255 "requestId": "req-A",
1256 "isMeta": false,
1257 "gitBranch": "main",
1258 "version": "2.1.56",
1259 });
1260 let noise_diff = serde_json::json!({
1261 "uuid": "u1",
1262 "type": "user",
1263 "parentUuid": null,
1264 "message": {"role": "user", "content": "hi"},
1265 "timestamp": "2026-06-17T00:00:05Z",
1266 "requestId": "req-B",
1267 "isMeta": true,
1268 "gitBranch": "feat/x",
1269 "version": "2.1.57",
1270 });
1271 let content_diff = serde_json::json!({
1272 "uuid": "u1",
1273 "type": "user",
1274 "parentUuid": null,
1275 "message": {"role": "user", "content": "different"},
1276 "timestamp": "2026-06-17T00:00:00Z",
1277 });
1278 assert_eq!(
1279 source_record_hash(&base),
1280 source_record_hash(&noise_diff),
1281 "noise-field differences must dedupe",
1282 );
1283 assert_ne!(
1284 source_record_hash(&base),
1285 source_record_hash(&content_diff),
1286 "semantic content differences must not dedupe",
1287 );
1288 }
1289
1290 #[tokio::test(flavor = "multi_thread")]
1291 async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
1292 let adapter = ClaudeCodeAdapter::new(FIXTURE_ROOT);
1293 crate::adapter::test_support::assert_native_restore(
1294 &ClaudeCodeFactory,
1295 &adapter,
1296 std::path::Path::new(FIXTURE_ROOT),
1297 )
1298 .await
1299 }
1300
1301 #[tokio::test(flavor = "multi_thread")]
1309 async fn subagent_file_derives_child_session_with_parent_link() -> anyhow::Result<()> {
1310 let corpus = TempDir::new()?;
1311 let project_dir = corpus.path().join("-tmp-pond-test");
1312 let parent_uuid = "11111111-1111-1111-1111-111111111111";
1313 let agent_hash = "abc123def456";
1314 std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1315
1316 let parent_row = serde_json::json!({
1318 "type": "user",
1319 "uuid": "u-parent-1",
1320 "sessionId": parent_uuid,
1321 "cwd": "/tmp/pond-test",
1322 "timestamp": "2026-05-16T00:00:00.000Z",
1323 "version": "2.1.121",
1324 "message": {"role": "user", "content": "hi parent"},
1325 });
1326 std::fs::write(
1327 project_dir.join(format!("{parent_uuid}.jsonl")),
1328 format!("{parent_row}\n"),
1329 )?;
1330
1331 let subagent_row = serde_json::json!({
1334 "type": "user",
1335 "uuid": "u-sub-1",
1336 "sessionId": parent_uuid,
1337 "cwd": "/tmp/pond-test",
1338 "isSidechain": true,
1339 "agentId": agent_hash,
1340 "timestamp": "2026-05-16T00:01:00.000Z",
1341 "version": "2.1.121",
1342 "message": {"role": "user", "content": "subagent prompt"},
1343 });
1344 std::fs::write(
1345 project_dir
1346 .join(parent_uuid)
1347 .join("subagents")
1348 .join(format!("agent-{agent_hash}.jsonl")),
1349 format!("{subagent_row}\n"),
1350 )?;
1351 std::fs::write(
1352 project_dir
1353 .join(parent_uuid)
1354 .join("subagents")
1355 .join(format!("agent-{agent_hash}.meta.json")),
1356 r#"{"agentType":"general-purpose","description":"do a thing"}"#,
1357 )?;
1358
1359 let store_dir = TempDir::new()?;
1360 let store = Store::open_local(store_dir.path()).await?;
1361 let adapter = ClaudeCodeAdapter::new(corpus.path());
1362
1363 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1364 assert_eq!(
1365 summary.dropped_sessions, 0,
1366 "subagent file must NOT collide with parent (pre-fix this was the project-immutable rejection)"
1367 );
1368
1369 let parent = store
1370 .get_session(parent_uuid)
1371 .await?
1372 .expect("parent session should ingest as the bare uuid");
1373 assert_eq!(parent.session.source_agent, "claude-code");
1374 assert_eq!(parent.session.parent_session_id, None);
1375
1376 let child_id = format!("{parent_uuid}/agent-{agent_hash}");
1377 let child = store
1378 .get_session(&child_id)
1379 .await?
1380 .expect("subagent session must surface under the derived id");
1381 assert_eq!(
1382 child.session.source_agent, "claude-code/general-purpose",
1383 "agent_type from .meta.json should suffix the source_agent label"
1384 );
1385 assert_eq!(
1386 child.session.parent_session_id.as_deref(),
1387 Some(parent_uuid),
1388 "subagent must link back to parent via parent_session_id",
1389 );
1390 let subagent_meta = child
1391 .session
1392 .options
1393 .get("subagent")
1394 .expect("options.subagent must carry the hash + verbatim meta.json");
1395 assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1396 assert_eq!(
1397 subagent_meta["meta"]["agentType"],
1398 serde_json::json!("general-purpose")
1399 );
1400 assert_eq!(
1401 subagent_meta["meta"]["description"],
1402 serde_json::json!("do a thing")
1403 );
1404 Ok(())
1405 }
1406
1407 #[tokio::test(flavor = "multi_thread")]
1411 async fn subagent_without_meta_falls_back_to_generic_label() -> anyhow::Result<()> {
1412 let corpus = TempDir::new()?;
1413 let project_dir = corpus.path().join("-tmp-pond-test");
1414 let parent_uuid = "22222222-2222-2222-2222-222222222222";
1415 let agent_hash = "deadbeef";
1416 std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1417 let row = serde_json::json!({
1418 "type": "user",
1419 "uuid": "u-sub-only",
1420 "sessionId": parent_uuid,
1421 "cwd": "/tmp/pond-test",
1422 "timestamp": "2026-05-16T00:00:00.000Z",
1423 "message": {"role": "user", "content": "no meta sibling here"},
1424 });
1425 std::fs::write(
1426 project_dir
1427 .join(parent_uuid)
1428 .join("subagents")
1429 .join(format!("agent-{agent_hash}.jsonl")),
1430 format!("{row}\n"),
1431 )?;
1432
1433 let store_dir = TempDir::new()?;
1434 let store = Store::open_local(store_dir.path()).await?;
1435 let adapter = ClaudeCodeAdapter::new(corpus.path());
1436 let _summary =
1437 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1438
1439 let child = store
1440 .get_session(&format!("{parent_uuid}/agent-{agent_hash}"))
1441 .await?
1442 .expect("derived child id even without meta");
1443 assert_eq!(child.session.source_agent, "claude-code/subagent");
1444 Ok(())
1445 }
1446
1447 #[tokio::test(flavor = "multi_thread")]
1455 async fn workflow_nested_subagent_derives_distinct_child_not_parent_collision()
1456 -> anyhow::Result<()> {
1457 let corpus = TempDir::new()?;
1458 let project_dir = corpus.path().join("-tmp-pond-test");
1459 let parent_uuid = "44444444-4444-4444-4444-444444444444";
1460 let wf_id = "wf_abcd1234-ef0";
1461 let agent_hash = "cafef00dbaadf00d1";
1462 let wf_dir = project_dir
1463 .join(parent_uuid)
1464 .join("subagents")
1465 .join("workflows")
1466 .join(wf_id);
1467 std::fs::create_dir_all(&wf_dir)?;
1468
1469 let parent_row = serde_json::json!({
1470 "type": "user",
1471 "uuid": "u-parent-1",
1472 "sessionId": parent_uuid,
1473 "cwd": "/tmp/pond-test",
1474 "timestamp": "2026-05-20T00:00:00.000Z",
1475 "message": {"role": "user", "content": "hi parent"},
1476 });
1477 std::fs::write(
1478 project_dir.join(format!("{parent_uuid}.jsonl")),
1479 format!("{parent_row}\n"),
1480 )?;
1481
1482 let subagent_row = serde_json::json!({
1484 "type": "user",
1485 "uuid": "u-wf-sub-1",
1486 "sessionId": parent_uuid,
1487 "cwd": "/tmp/pond-test/packages/sub",
1488 "isSidechain": true,
1489 "agentId": agent_hash,
1490 "timestamp": "2026-05-20T00:01:00.000Z",
1491 "message": {"role": "user", "content": "workflow subagent prompt"},
1492 });
1493 std::fs::write(
1494 wf_dir.join(format!("agent-{agent_hash}.jsonl")),
1495 format!("{subagent_row}\n"),
1496 )?;
1497 std::fs::write(
1498 wf_dir.join(format!("agent-{agent_hash}.meta.json")),
1499 r#"{"agentType":"general-purpose","description":"workflow child"}"#,
1500 )?;
1501
1502 let store_dir = TempDir::new()?;
1503 let store = Store::open_local(store_dir.path()).await?;
1504 let adapter = ClaudeCodeAdapter::new(corpus.path());
1505 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1506 assert_eq!(
1507 summary.dropped_sessions, 0,
1508 "nested workflow subagent must NOT collide with the parent project",
1509 );
1510
1511 let parent = store
1512 .get_session(parent_uuid)
1513 .await?
1514 .expect("parent session ingests under the bare uuid");
1515 assert_eq!(&*parent.session.project, "/tmp/pond-test");
1516 assert_eq!(parent.session.parent_session_id, None);
1517
1518 let child_id = format!("{parent_uuid}/workflows/{wf_id}/agent-{agent_hash}");
1519 let child = store
1520 .get_session(&child_id)
1521 .await?
1522 .expect("workflow subagent surfaces under the full nested child id");
1523 assert_eq!(child.session.source_agent, "claude-code/general-purpose");
1524 assert_eq!(
1525 child.session.parent_session_id.as_deref(),
1526 Some(parent_uuid)
1527 );
1528 assert_eq!(
1529 &*child.session.project, "/tmp/pond-test/packages/sub",
1530 "child keeps its own cwd-derived project, distinct from the parent",
1531 );
1532 let subagent_meta = child
1533 .session
1534 .options
1535 .get("subagent")
1536 .expect("options.subagent present");
1537 assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1538 Ok(())
1539 }
1540
1541 #[tokio::test(flavor = "multi_thread")]
1547 async fn unrecognized_subagents_file_fails_visibly_not_merged() -> anyhow::Result<()> {
1548 let corpus = TempDir::new()?;
1549 let project_dir = corpus.path().join("-tmp-pond-test");
1550 let parent_uuid = "55555555-5555-5555-5555-555555555555";
1551 let unknown_dir = project_dir
1552 .join(parent_uuid)
1553 .join("subagents")
1554 .join("workflows")
1555 .join("wf_future01-aaa");
1556 std::fs::create_dir_all(&unknown_dir)?;
1557
1558 let parent_row = serde_json::json!({
1559 "type": "user",
1560 "uuid": "u-parent-only",
1561 "sessionId": parent_uuid,
1562 "cwd": "/tmp/pond-test",
1563 "timestamp": "2026-05-20T00:00:00.000Z",
1564 "message": {"role": "user", "content": "parent message"},
1565 });
1566 std::fs::write(
1567 project_dir.join(format!("{parent_uuid}.jsonl")),
1568 format!("{parent_row}\n"),
1569 )?;
1570
1571 let unknown_row = serde_json::json!({
1574 "type": "user",
1575 "uuid": "u-should-not-merge",
1576 "sessionId": parent_uuid,
1577 "cwd": "/tmp/pond-test",
1578 "timestamp": "2026-05-20T00:02:00.000Z",
1579 "message": {"role": "user", "content": "must not land under parent"},
1580 });
1581 std::fs::write(
1582 unknown_dir.join("transcript-001.jsonl"),
1583 format!("{unknown_row}\n"),
1584 )?;
1585
1586 let store_dir = TempDir::new()?;
1587 let store = Store::open_local(store_dir.path()).await?;
1588 let adapter = ClaudeCodeAdapter::new(corpus.path());
1589 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1590
1591 assert_eq!(
1592 summary.skipped_files, 1,
1593 "the unrecognized subagents/ transcript must be a visible, counted skip",
1594 );
1595 let parent = store
1596 .get_session(parent_uuid)
1597 .await?
1598 .expect("parent session ingests");
1599 assert_eq!(
1600 parent.messages.len(),
1601 1,
1602 "the unrecognized file's row must NOT be merged into the parent session",
1603 );
1604 assert!(
1605 parent
1606 .messages
1607 .iter()
1608 .all(|m| m.message.id() != "u-should-not-merge"),
1609 "parent must not absorb the unrecognized file's message",
1610 );
1611 Ok(())
1612 }
1613
1614 #[tokio::test(flavor = "multi_thread")]
1622 async fn unrecognized_subagents_file_stays_visible_under_parent_watermark() -> anyhow::Result<()>
1623 {
1624 struct ParentAlreadyFresh;
1625 impl crate::adapter::SkipOracle for ParentAlreadyFresh {
1626 fn session_max_ts(&self, _session_id: &str) -> Option<i64> {
1627 Some(i64::MAX)
1631 }
1632 fn is_empty(&self) -> bool {
1633 false
1634 }
1635 }
1636
1637 let corpus = TempDir::new()?;
1638 let project_dir = corpus.path().join("-tmp-pond-test");
1639 let parent_uuid = "66666666-6666-6666-6666-666666666666";
1640 let unknown_dir = project_dir
1641 .join(parent_uuid)
1642 .join("subagents")
1643 .join("workflows")
1644 .join("wf_future02-bbb");
1645 std::fs::create_dir_all(&unknown_dir)?;
1646
1647 let parent_row = serde_json::json!({
1648 "type": "user",
1649 "uuid": "u-parent-fresh",
1650 "sessionId": parent_uuid,
1651 "cwd": "/tmp/pond-test",
1652 "timestamp": "2026-05-20T00:00:00.000Z",
1653 "message": {"role": "user", "content": "parent message"},
1654 });
1655 std::fs::write(
1656 project_dir.join(format!("{parent_uuid}.jsonl")),
1657 format!("{parent_row}\n"),
1658 )?;
1659
1660 let unknown_row = serde_json::json!({
1663 "type": "user",
1664 "uuid": "u-resync-should-stay-visible",
1665 "sessionId": parent_uuid,
1666 "cwd": "/tmp/pond-test",
1667 "timestamp": "2026-05-20T00:02:00.000Z",
1668 "message": {"role": "user", "content": "must stay visible"},
1669 });
1670 std::fs::write(
1671 unknown_dir.join("transcript-002.jsonl"),
1672 format!("{unknown_row}\n"),
1673 )?;
1674
1675 let store_dir = TempDir::new()?;
1676 let store = Store::open_local(store_dir.path()).await?;
1677 let adapter = ClaudeCodeAdapter::new(corpus.path());
1678 let summary = ingest_adapter(&store, &adapter, &ParentAlreadyFresh, |_| {}).await?;
1679
1680 assert_eq!(
1681 summary.skipped_files, 1,
1682 "the unrecognized transcript must stay a visible Unsupported skip, not be fresh-skipped under the parent's watermark",
1683 );
1684 assert_eq!(
1687 summary.skipped_fresh, 1,
1688 "only the parent may fresh-skip; the unrecognized file must not borrow its watermark",
1689 );
1690 Ok(())
1691 }
1692
1693 #[tokio::test(flavor = "multi_thread")]
1698 async fn replay_duplicates_are_dedup_at_adapter_layer() -> anyhow::Result<()> {
1699 let corpus = TempDir::new()?;
1700 let project_dir = corpus.path().join("-tmp-pond-test");
1701 std::fs::create_dir_all(&project_dir)?;
1702 let session_uuid = "33333333-3333-3333-3333-333333333333";
1703 let dup_uuid = "u-shared-1";
1704 let row = serde_json::json!({
1705 "type": "user",
1706 "uuid": dup_uuid,
1707 "sessionId": session_uuid,
1708 "cwd": "/tmp/pond-test",
1709 "timestamp": "2026-05-16T00:00:00.000Z",
1710 "message": {"role": "user", "content": "replayed three times"},
1711 });
1712 let body = format!("{row}\n{row}\n{row}\n");
1714 std::fs::write(project_dir.join(format!("{session_uuid}.jsonl")), body)?;
1715
1716 let store_dir = TempDir::new()?;
1717 let store = Store::open_local(store_dir.path()).await?;
1718 let adapter = ClaudeCodeAdapter::new(corpus.path());
1719 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1720
1721 assert_eq!(
1722 summary.dropped_events, 0,
1723 "adapter must dedupe replays before they reach the validator"
1724 );
1725 assert!(
1726 !summary
1727 .drop_reasons
1728 .contains_key(crate::sessions::DROP_REASON_DUPLICATE_MESSAGE_ID),
1729 "duplicate_message_id bucket stays empty when adapter does its job"
1730 );
1731 Ok(())
1732 }
1733
1734 #[tokio::test(flavor = "multi_thread")]
1735 async fn same_uuid_different_content_is_visible_duplicate_not_adapter_drop()
1736 -> anyhow::Result<()> {
1737 let corpus = TempDir::new()?;
1738 let project_dir = corpus.path().join("-tmp-pond-test");
1739 std::fs::create_dir_all(&project_dir)?;
1740 let session_uuid = "33333333-3333-3333-3333-333333333334";
1741 let dup_uuid = "u-shared-different";
1742 let first = serde_json::json!({
1743 "type": "user",
1744 "uuid": dup_uuid,
1745 "sessionId": session_uuid,
1746 "cwd": "/tmp/pond-test",
1747 "timestamp": "2026-05-16T00:00:00.000Z",
1748 "message": {"role": "user", "content": "first content"},
1749 });
1750 let second = serde_json::json!({
1751 "type": "user",
1752 "uuid": dup_uuid,
1753 "sessionId": session_uuid,
1754 "cwd": "/tmp/pond-test",
1755 "timestamp": "2026-05-16T00:00:01.000Z",
1756 "message": {"role": "user", "content": "changed content"},
1757 });
1758 std::fs::write(
1759 project_dir.join(format!("{session_uuid}.jsonl")),
1760 format!("{first}\n{second}\n"),
1761 )?;
1762
1763 let store_dir = TempDir::new()?;
1764 let store = Store::open_local(store_dir.path()).await?;
1765 let adapter = ClaudeCodeAdapter::new(corpus.path());
1766 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1767
1768 assert_eq!(
1769 summary
1770 .drop_reasons
1771 .get(crate::sessions::DROP_REASON_DUPLICATE_MESSAGE_ID)
1772 .copied(),
1773 Some(1),
1774 "same uuid with changed content must reach the visible duplicate-id path",
1775 );
1776 Ok(())
1777 }
1778
1779 #[tokio::test(flavor = "multi_thread")]
1780 async fn session_row_without_messages_does_not_fresh_skip_source() -> anyhow::Result<()> {
1781 let corpus = TempDir::new()?;
1782 let project_dir = corpus.path().join("-tmp-pond-test");
1783 std::fs::create_dir_all(&project_dir)?;
1784 let session_uuid = "33333333-3333-3333-3333-333333333335";
1785 let row = serde_json::json!({
1786 "type": "user",
1787 "uuid": "u-after-partial",
1788 "sessionId": session_uuid,
1789 "cwd": "/tmp/pond-test",
1790 "timestamp": "2026-05-16T00:00:00.000Z",
1791 "message": {"role": "user", "content": "healed by replay"},
1792 });
1793 std::fs::write(
1794 project_dir.join(format!("{session_uuid}.jsonl")),
1795 format!("{row}\n"),
1796 )?;
1797
1798 let store_dir = TempDir::new()?;
1799 let store = Store::open_local(store_dir.path()).await?;
1800 store
1801 .upsert_sessions(&[Session {
1802 id: session_uuid.to_owned(),
1803 parent_session_id: None,
1804 parent_message_id: None,
1805 source_agent: "claude-code".to_owned(),
1806 created_at: DateTime::parse_from_rfc3339("2026-05-16T00:00:00.000Z")?
1807 .with_timezone(&Utc),
1808 project: Extracted::from_test_value("/tmp/pond-test".to_owned()),
1809 options: ProviderOptions::new(),
1810 }])
1811 .await?;
1812
1813 let last_ids = store.session_last_message_ids().await?;
1814 assert!(
1815 !last_ids.contains_key(session_uuid),
1816 "a session row without messages must not produce a freshness key",
1817 );
1818 let adapter = ClaudeCodeAdapter::new(corpus.path());
1819 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1820 assert_eq!(summary.skipped_fresh, 0);
1821 let session = store
1822 .get_session(session_uuid)
1823 .await?
1824 .expect("session row exists");
1825 assert_eq!(session.messages.len(), 1, "replay must heal messages");
1826 Ok(())
1827 }
1828
1829 #[test]
1835 fn peek_last_ts_walks_back_past_trailing_metadata_rows() {
1836 let corpus = TempDir::new().unwrap();
1837 let project_dir = corpus.path().join("-tmp-pond-test");
1838 std::fs::create_dir_all(&project_dir).unwrap();
1839 let session_uuid = "44444444-4444-4444-4444-444444444444";
1840 let message = serde_json::json!({
1841 "type": "user",
1842 "uuid": "u-1",
1843 "sessionId": session_uuid,
1844 "cwd": "/tmp/pond-test",
1845 "timestamp": "2026-05-16T00:00:00.000Z",
1846 "message": {"role": "user", "content": "hello"},
1847 });
1848 let last_prompt =
1850 serde_json::json!({"type": "last-prompt", "sessionId": session_uuid, "prompt": "hi"});
1851 let permission = serde_json::json!({"type": "permission-mode", "sessionId": session_uuid});
1852 let path = project_dir.join(format!("{session_uuid}.jsonl"));
1853 std::fs::write(&path, format!("{message}\n{last_prompt}\n{permission}\n")).unwrap();
1854
1855 let adapter = ClaudeCodeAdapter::new(corpus.path());
1856 let expected = DateTime::parse_from_rfc3339("2026-05-16T00:00:00.000Z")
1857 .unwrap()
1858 .timestamp_micros();
1859 assert_eq!(
1860 adapter.peek_last_ts(&path),
1861 Some(expected),
1862 "walk back past trailing metadata to the last message's timestamp",
1863 );
1864 }
1865
1866 #[tokio::test(flavor = "multi_thread")]
1870 async fn tool_result_name_resolves_from_prior_tool_use_in_same_file() -> anyhow::Result<()> {
1871 let corpus = TempDir::new()?;
1872 let project_dir = corpus.path().join("-tmp-pond-test");
1873 std::fs::create_dir_all(&project_dir)?;
1874 let session_uuid = "44444444-4444-4444-4444-444444444444";
1875 let call_id = "toolu_test_01";
1876
1877 let tool_use_row = serde_json::json!({
1878 "type": "assistant",
1879 "uuid": "u-call",
1880 "sessionId": session_uuid,
1881 "cwd": "/tmp/pond-test",
1882 "timestamp": "2026-05-16T00:00:00.000Z",
1883 "message": {
1884 "role": "assistant",
1885 "content": [{
1886 "type": "tool_use",
1887 "id": call_id,
1888 "name": "Edit",
1889 "input": {"file_path": "/tmp/foo"},
1890 }],
1891 },
1892 });
1893 let tool_result_row = serde_json::json!({
1894 "type": "user",
1895 "uuid": "u-result",
1896 "sessionId": session_uuid,
1897 "cwd": "/tmp/pond-test",
1898 "timestamp": "2026-05-16T00:00:01.000Z",
1899 "message": {
1900 "role": "user",
1901 "content": [{
1902 "type": "tool_result",
1903 "tool_use_id": call_id,
1904 "content": "ok",
1905 }],
1906 },
1907 });
1908 std::fs::write(
1909 project_dir.join(format!("{session_uuid}.jsonl")),
1910 format!("{tool_use_row}\n{tool_result_row}\n"),
1911 )?;
1912
1913 let store_dir = TempDir::new()?;
1914 let store = Store::open_local(store_dir.path()).await?;
1915 let adapter = ClaudeCodeAdapter::new(corpus.path());
1916 let _summary =
1917 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1918 let session = store
1919 .get_session(session_uuid)
1920 .await?
1921 .expect("session ingests");
1922
1923 let mut saw_call = false;
1924 let mut saw_result = false;
1925 for stored in &session.messages {
1926 for part in &stored.parts {
1927 match &part.kind {
1928 PartKind::ToolCall {
1929 call_id: cid, name, ..
1930 } => {
1931 assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1932 assert_eq!(
1933 name.as_ref().map(|e| e.as_str()),
1934 Some("Edit"),
1935 "tool_use carries the name directly"
1936 );
1937 saw_call = true;
1938 }
1939 PartKind::ToolResult {
1940 call_id: cid, name, ..
1941 } => {
1942 assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1943 assert_eq!(
1944 name.as_ref().map(|e| e.as_str()),
1945 Some("Edit"),
1946 "tool_result resolves the name via the per-file map (was 'unknown' pre-2026-05-16)"
1947 );
1948 saw_result = true;
1949 }
1950 _ => {}
1951 }
1952 }
1953 }
1954 assert!(saw_call && saw_result, "both parts must be present");
1955 Ok(())
1956 }
1957
1958 #[test]
1962 fn user_text_provenance_separates_prompts_from_harness_injection() {
1963 let prompt = json!({"type": "user", "uuid": "u1"});
1964 assert_eq!(
1965 user_text_provenance(&prompt, "please refactor the parser"),
1966 Provenance::Conversational,
1967 );
1968
1969 let notification = json!({"type": "user", "uuid": "u2"});
1970 assert_eq!(
1971 user_text_provenance(
1972 ¬ification,
1973 "<task-notification>background task done</task-notification>",
1974 ),
1975 Provenance::Injected,
1976 );
1977
1978 let meta = json!({"type": "user", "uuid": "u3", "isMeta": true});
1979 assert_eq!(
1980 user_text_provenance(&meta, "expanded skill body"),
1981 Provenance::Injected,
1982 );
1983 }
1984
1985 #[tokio::test(flavor = "multi_thread")]
1989 async fn task_notification_message_yields_injected_parts() -> anyhow::Result<()> {
1990 let corpus = TempDir::new()?;
1991 let project_dir = corpus.path().join("-tmp-pond-test");
1992 std::fs::create_dir_all(&project_dir)?;
1993 let session_uuid = "66666666-6666-6666-6666-666666666666";
1994 let prompt = serde_json::json!({
1995 "type": "user",
1996 "uuid": "u-prompt",
1997 "sessionId": session_uuid,
1998 "cwd": "/tmp/pond-test",
1999 "timestamp": "2026-05-16T00:00:00.000Z",
2000 "message": {"role": "user", "content": "genuine human prompt"},
2001 });
2002 let notification = serde_json::json!({
2003 "type": "user",
2004 "uuid": "u-notify",
2005 "sessionId": session_uuid,
2006 "cwd": "/tmp/pond-test",
2007 "timestamp": "2026-05-16T00:00:01.000Z",
2008 "message": {
2009 "role": "user",
2010 "content": "<task-notification>a background task finished</task-notification>",
2011 },
2012 });
2013 std::fs::write(
2014 project_dir.join(format!("{session_uuid}.jsonl")),
2015 format!("{prompt}\n{notification}\n"),
2016 )?;
2017
2018 let store_dir = TempDir::new()?;
2019 let store = Store::open_local(store_dir.path()).await?;
2020 let adapter = ClaudeCodeAdapter::new(corpus.path());
2021 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
2022
2023 let session = store
2024 .get_session(session_uuid)
2025 .await?
2026 .expect("session ingests");
2027 let mut saw_prompt = false;
2028 let mut saw_notification = false;
2029 for stored in &session.messages {
2030 for part in &stored.parts {
2031 if stored.message.id() == "u-prompt" {
2032 assert_eq!(part.provenance, crate::wire::Provenance::Conversational);
2033 saw_prompt = true;
2034 }
2035 if stored.message.id() == "u-notify" {
2036 assert_eq!(part.provenance, crate::wire::Provenance::Injected);
2037 saw_notification = true;
2038 }
2039 }
2040 }
2041 assert!(saw_prompt && saw_notification, "both messages present");
2042 Ok(())
2043 }
2044
2045 #[tokio::test(flavor = "multi_thread")]
2049 async fn orphan_tool_result_yields_name_none_not_unknown_sentinel() -> anyhow::Result<()> {
2050 let corpus = TempDir::new()?;
2051 let project_dir = corpus.path().join("-tmp-pond-test");
2052 std::fs::create_dir_all(&project_dir)?;
2053 let session_uuid = "55555555-5555-5555-5555-555555555555";
2054
2055 let row = serde_json::json!({
2057 "type": "user",
2058 "uuid": "u-orphan",
2059 "sessionId": session_uuid,
2060 "cwd": "/tmp/pond-test",
2061 "timestamp": "2026-05-16T00:00:00.000Z",
2062 "message": {
2063 "role": "user",
2064 "content": [{
2065 "type": "tool_result",
2066 "tool_use_id": "toolu_orphan",
2067 "content": "result body, no matching call",
2068 }],
2069 },
2070 });
2071 std::fs::write(
2072 project_dir.join(format!("{session_uuid}.jsonl")),
2073 format!("{row}\n"),
2074 )?;
2075
2076 let store_dir = TempDir::new()?;
2077 let store = Store::open_local(store_dir.path()).await?;
2078 let adapter = ClaudeCodeAdapter::new(corpus.path());
2079 let _summary =
2080 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
2081 let session = store
2082 .get_session(session_uuid)
2083 .await?
2084 .expect("session ingests");
2085 let mut found = false;
2086 for stored in &session.messages {
2087 for part in &stored.parts {
2088 if let PartKind::ToolResult { name, call_id, .. } = &part.kind {
2089 assert_eq!(call_id.as_ref().map(|e| e.as_str()), Some("toolu_orphan"));
2090 assert!(
2091 name.is_none(),
2092 "orphan tool_result must be name=None, not synthesized 'unknown'",
2093 );
2094 found = true;
2095 }
2096 }
2097 }
2098 assert!(found, "orphan tool_result part must be present");
2099 Ok(())
2101 }
2102
2103 #[tokio::test(flavor = "multi_thread")]
2104 async fn unknown_message_role_becomes_lossless_carrier() -> anyhow::Result<()> {
2105 let corpus = TempDir::new()?;
2106 let project_dir = corpus.path().join("-tmp-pond-test");
2107 std::fs::create_dir_all(&project_dir)?;
2108 let session_uuid = "66666666-6666-6666-6666-666666666666";
2109
2110 let row = serde_json::json!({
2113 "type": "user",
2114 "uuid": "u-future",
2115 "sessionId": session_uuid,
2116 "cwd": "/tmp/pond-test",
2117 "timestamp": "2026-05-16T00:00:00.000Z",
2118 "message": {
2119 "role": "future_role",
2120 "content": "keep me",
2121 },
2122 });
2123 std::fs::write(
2124 project_dir.join(format!("{session_uuid}.jsonl")),
2125 format!("{row}\n"),
2126 )?;
2127
2128 let store_dir = TempDir::new()?;
2129 let store = Store::open_local(store_dir.path()).await?;
2130 let adapter = ClaudeCodeAdapter::new(corpus.path());
2131 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
2132 assert!(
2133 summary.drop_reasons.is_empty(),
2134 "an unknown role must be carried, not dropped: {:?}",
2135 summary.drop_reasons,
2136 );
2137 let session = store
2138 .get_session(session_uuid)
2139 .await?
2140 .expect("session with the carried record ingests");
2141 let carrier = session
2142 .messages
2143 .iter()
2144 .find(|stored| stored.message.id() == "u-future")
2145 .expect("the unknown-role record lands as a message");
2146 assert!(
2147 matches!(&carrier.message, Message::System { content, .. }
2148 if content.as_deref().is_some_and(|c| c.contains("future_role"))),
2149 "unmapped role must become a System carrier preserving the record",
2150 );
2151 Ok(())
2152 }
2153
2154 #[test]
2158 fn workflow_journal_is_a_control_file_not_unsupported() {
2159 let adapter = ClaudeCodeAdapter::new("/tmp/pond-test-root");
2160 let journal = std::path::Path::new(
2161 "/root/-proj/55555555-5555-5555-5555-555555555555/subagents/workflows/wf_030e6487-da6/journal.jsonl",
2162 );
2163 assert!(is_workflow_control_file(journal));
2164 assert!(
2165 adapter.unsupported_reason(journal).is_none(),
2166 "journal.jsonl is a known control file, not an unsupported layout",
2167 );
2168 }
2169
2170 #[test]
2174 fn unknown_subagents_leaf_is_still_unsupported() {
2175 let adapter = ClaudeCodeAdapter::new("/tmp/pond-test-root");
2176 let unknown = std::path::Path::new(
2177 "/root/-proj/PARENT/subagents/workflows/wf_x/transcript-001.jsonl",
2178 );
2179 assert!(
2180 adapter.unsupported_reason(unknown).is_some(),
2181 "an unrecognized non-agent, non-journal leaf must still fail visibly",
2182 );
2183 assert!(!is_workflow_control_file(unknown));
2184
2185 let agent = std::path::Path::new("/root/-proj/PARENT/subagents/agent-abc123def456.jsonl");
2186 assert!(
2187 adapter.unsupported_reason(agent).is_none(),
2188 "a recognized agent transcript is resolvable, not unsupported",
2189 );
2190 }
2191
2192 #[tokio::test(flavor = "multi_thread")]
2197 async fn workflow_journal_skipped_benignly_while_sibling_agent_ingests() -> anyhow::Result<()> {
2198 let corpus = TempDir::new()?;
2199 let project_dir = corpus.path().join("-tmp-pond-test");
2200 let parent_uuid = "77777777-7777-7777-7777-777777777777";
2201 let wf_id = "wf_030e6487-da6";
2202 let agent_hash = "a38f4724ef3864da8";
2203 let wf_dir = project_dir
2204 .join(parent_uuid)
2205 .join("subagents")
2206 .join("workflows")
2207 .join(wf_id);
2208 std::fs::create_dir_all(&wf_dir)?;
2209
2210 let parent_row = serde_json::json!({
2211 "type": "user",
2212 "uuid": "u-parent-1",
2213 "sessionId": parent_uuid,
2214 "cwd": "/tmp/pond-test",
2215 "timestamp": "2026-06-04T00:00:00.000Z",
2216 "message": {"role": "user", "content": "hi parent"},
2217 });
2218 std::fs::write(
2219 project_dir.join(format!("{parent_uuid}.jsonl")),
2220 format!("{parent_row}\n"),
2221 )?;
2222
2223 let agent_row = serde_json::json!({
2224 "type": "user",
2225 "uuid": "u-agent-1",
2226 "sessionId": parent_uuid,
2227 "cwd": "/tmp/pond-test",
2228 "timestamp": "2026-06-04T00:01:00.000Z",
2229 "message": {"role": "user", "content": "workflow agent prompt"},
2230 });
2231 std::fs::write(
2232 wf_dir.join(format!("agent-{agent_hash}.jsonl")),
2233 format!("{agent_row}\n"),
2234 )?;
2235
2236 std::fs::write(
2238 wf_dir.join("journal.jsonl"),
2239 "{\"type\":\"started\",\"key\":\"v2:abc\",\"agentId\":\"a38f\"}\n\
2240 {\"type\":\"result\",\"key\":\"v2:abc\",\"agentId\":\"a38f\",\"result\":{}}\n",
2241 )?;
2242
2243 let store_dir = TempDir::new()?;
2244 let store = Store::open_local(store_dir.path()).await?;
2245 let adapter = ClaudeCodeAdapter::new(corpus.path());
2246 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
2247 assert_eq!(
2248 summary.skipped_files, 0,
2249 "journal.jsonl is a control file (benign Empty skip), not an unsupported failure",
2250 );
2251
2252 let child = store
2253 .get_session(&format!(
2254 "{parent_uuid}/workflows/{wf_id}/agent-{agent_hash}"
2255 ))
2256 .await?
2257 .expect("the sibling agent transcript still ingests as a child session");
2258 assert_eq!(
2259 child.session.parent_session_id.as_deref(),
2260 Some(parent_uuid)
2261 );
2262
2263 let parent = store
2264 .get_session(parent_uuid)
2265 .await?
2266 .expect("parent session ingests");
2267 assert_eq!(
2268 parent.messages.len(),
2269 1,
2270 "journal rows must NOT merge into the parent session",
2271 );
2272 Ok(())
2273 }
2274
2275 #[tokio::test(flavor = "multi_thread")]
2279 async fn workflow_journal_with_parent_sessionid_still_not_merged() -> anyhow::Result<()> {
2280 let corpus = TempDir::new()?;
2281 let project_dir = corpus.path().join("-tmp-pond-test");
2282 let parent_uuid = "88888888-8888-8888-8888-888888888888";
2283 let wf_dir = project_dir
2284 .join(parent_uuid)
2285 .join("subagents")
2286 .join("workflows")
2287 .join("wf_abc01234-def");
2288 std::fs::create_dir_all(&wf_dir)?;
2289
2290 let parent_row = serde_json::json!({
2291 "type": "user",
2292 "uuid": "u-parent",
2293 "sessionId": parent_uuid,
2294 "cwd": "/tmp/pond-test",
2295 "timestamp": "2026-06-04T00:00:00.000Z",
2296 "message": {"role": "user", "content": "parent only"},
2297 });
2298 std::fs::write(
2299 project_dir.join(format!("{parent_uuid}.jsonl")),
2300 format!("{parent_row}\n"),
2301 )?;
2302
2303 let journal_row = serde_json::json!({
2306 "type": "started",
2307 "key": "v2:abc",
2308 "agentId": "a1",
2309 "sessionId": parent_uuid,
2310 "message": {"role": "user", "content": "must not merge"},
2311 });
2312 std::fs::write(wf_dir.join("journal.jsonl"), format!("{journal_row}\n"))?;
2313
2314 let store_dir = TempDir::new()?;
2315 let store = Store::open_local(store_dir.path()).await?;
2316 let adapter = ClaudeCodeAdapter::new(corpus.path());
2317 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
2318 assert_eq!(
2319 summary.skipped_files, 0,
2320 "journal is a benign Empty skip, not an unsupported failure",
2321 );
2322 let parent = store
2323 .get_session(parent_uuid)
2324 .await?
2325 .expect("parent session ingests");
2326 assert_eq!(
2327 parent.messages.len(),
2328 1,
2329 "journal row must NOT merge even when it carries the parent sessionId",
2330 );
2331 Ok(())
2332 }
2333}