1use std::{
10 collections::{HashMap, HashSet},
11 path::{Path, PathBuf},
12};
13
14use anyhow::Context as _;
15use chrono::{DateTime, SecondsFormat, Utc};
16use serde_json::{Value, json};
17
18use crate::{
19 sessions::IngestEvent,
20 wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
21};
22
23use super::{
24 Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
25 RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
26 empty_options,
27 extract::{
28 Extracted, Source, extract_compact_repr, extract_raw_record, extract_self_str, extract_str,
29 },
30 extracted_text,
31 jsonl::{BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events, source_line},
32 jsonl_bytes, part_id, part_ordinal, raw_record,
33};
34
35#[derive(Debug, Default)]
56pub(crate) struct FileState {
57 seen_uuids: HashSet<String>,
58 tool_call_names: HashMap<String, Extracted<String>>,
59}
60
61const NAME: &str = "claude-code";
65
66pub struct ClaudeCodeFactory;
69
70impl AdapterFactory for ClaudeCodeFactory {
71 fn name(&self) -> &'static str {
72 NAME
73 }
74
75 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
76 Ok(Box::new(ClaudeCodeAdapter::new(config_path(NAME, config)?)))
77 }
78
79 fn probe_default(&self, env: &Env) -> Option<Value> {
80 let path = env.home.join(".claude").join("projects");
81 path.exists().then(|| json!({ "path": path }))
82 }
83
84 fn serialize(
85 &self,
86 session: &crate::sessions::SessionWithMessages,
87 fidelity: RestoreFidelity,
88 ) -> Result<Vec<RestoredFile>, AdapterError> {
89 serialize_session(session, fidelity)
90 }
91}
92
93fn serialize_session(
94 session: &crate::sessions::SessionWithMessages,
95 fidelity: RestoreFidelity,
96) -> Result<Vec<RestoredFile>, AdapterError> {
97 let mut messages = session.messages.clone();
98 if fidelity == RestoreFidelity::Native {
99 messages.sort_by(|left, right| {
100 source_line(left.message.options())
101 .cmp(&source_line(right.message.options()))
102 .then_with(|| by_timestamp_then_id(left, right))
103 });
104 } else {
105 messages.sort_by(by_timestamp_then_id);
106 }
107 let mut records = Vec::with_capacity(messages.len());
111 let mut parent_uuid = None::<String>;
112 for message in &messages {
113 if fidelity == RestoreFidelity::Native
114 && let Some(raw) = raw_record(message.message.options())
115 {
116 parent_uuid = raw
117 .get("uuid")
118 .and_then(Value::as_str)
119 .map(ToOwned::to_owned)
120 .or(parent_uuid);
121 records.push(raw);
122 continue;
123 }
124 let Some(record) = claude_record(session, message, parent_uuid.as_deref()) else {
127 continue;
128 };
129 parent_uuid = record
130 .get("uuid")
131 .and_then(Value::as_str)
132 .map(ToOwned::to_owned);
133 records.push(record);
134 }
135
136 let mut files = vec![RestoredFile::new(
137 claude_relative_path(session),
138 jsonl_bytes(NAME, &records)?,
139 fidelity,
140 )];
141 if session.session.parent_session_id.is_some()
142 && let Some(meta) = subagent_meta_record(session)
143 {
144 let mut meta_path = files[0].relative_path.clone();
145 meta_path.set_extension("meta.json");
146 files.push(RestoredFile::new(
147 meta_path,
148 serde_json::to_vec(&meta).map_err(|err| {
149 AdapterError::schema(
150 NAME,
151 &session.session.id,
152 format!("json encode failed: {err}"),
153 )
154 })?,
155 fidelity,
156 ));
157 }
158 Ok(files)
159}
160
161fn claude_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
162 let encoded_project = session
163 .session
164 .options
165 .get("source")
166 .and_then(|source| source.get("project_dir"))
167 .and_then(Value::as_str)
168 .map(ToOwned::to_owned)
169 .unwrap_or_else(|| encode_project(&session.session.project));
170 if let Some(parent) = &session.session.parent_session_id {
171 let child_suffix = session
176 .session
177 .id
178 .strip_prefix(&format!("{parent}/"))
179 .unwrap_or(&session.session.id);
180 return PathBuf::from(encoded_project)
181 .join(parent)
182 .join("subagents")
183 .join(format!("{child_suffix}.jsonl"));
184 }
185 PathBuf::from(encoded_project).join(format!("{}.jsonl", session.session.id))
186}
187
188fn encode_project(project: &str) -> String {
189 project.replace(['/', '.'], "-")
190}
191
192fn subagent_meta_record(session: &crate::sessions::SessionWithMessages) -> Option<Value> {
193 let meta = session.session.options.get("subagent")?.get("meta")?;
197 meta.is_object().then(|| meta.clone())
198}
199
200fn claude_record(
201 session: &crate::sessions::SessionWithMessages,
202 message: &crate::sessions::MessageWithParts,
203 parent_uuid: Option<&str>,
204) -> Option<Value> {
205 let row_role = match &message.message {
213 Message::System { .. } => return None,
214 Message::User { .. } | Message::Tool { .. } => "user",
215 Message::Assistant { .. } => "assistant",
216 };
217 let mut envelope = serde_json::Map::new();
218 envelope.insert("role".to_owned(), Value::String(row_role.to_owned()));
219 if row_role == "assistant" {
220 envelope.insert("type".to_owned(), Value::String("message".to_owned()));
223 }
224 envelope.insert(
225 "content".to_owned(),
226 Value::Array(message.parts.iter().map(claude_part).collect()),
227 );
228 Some(json!({
229 "parentUuid": parent_uuid,
230 "isSidechain": false,
231 "userType": "external",
232 "cwd": &*session.session.project,
233 "sessionId": &session.session.id,
234 "type": row_role,
235 "message": Value::Object(envelope),
236 "uuid": message.message.id(),
237 "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
238 }))
239}
240
241fn claude_part(part: &Part) -> Value {
242 match &part.kind {
243 PartKind::Text { text } => json!({"type": "text", "text": extracted_text(text)}),
244 PartKind::Reasoning { text } => {
245 json!({"type": "thinking", "thinking": extracted_text(text)})
246 }
247 PartKind::ToolCall {
248 call_id,
249 name,
250 params,
251 provider_executed,
252 } => json!({
253 "type": if *provider_executed { "server_tool_use" } else { "tool_use" },
254 "id": extracted_text(call_id),
255 "name": extracted_text(name),
256 "input": params,
257 }),
258 PartKind::ToolResult {
259 call_id,
260 is_failure,
261 result,
262 ..
263 } => json!({
264 "type": "tool_result",
265 "tool_use_id": extracted_text(call_id),
266 "is_error": is_failure,
267 "content": result,
268 }),
269 PartKind::File {
270 media_type,
271 file_name,
272 data,
273 } => json!({
274 "type": "file",
275 "media_type": media_type,
276 "file_name": file_name,
277 "source": file_source(data),
278 }),
279 other => {
280 json!({"type": "text", "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null))})
281 }
282 }
283}
284
285fn file_source(data: &FileData) -> Value {
286 match data {
287 FileData::String(value) => json!({"type": "text", "data": value}),
288 FileData::Bytes(value) => json!({"type": "base64", "data": value}),
289 FileData::Url(value) => json!({"type": "url", "url": value}),
290 }
291}
292
293#[derive(Debug, Clone)]
296pub struct ClaudeCodeAdapter {
297 root: PathBuf,
298}
299
300impl ClaudeCodeAdapter {
301 pub fn new(root: impl Into<PathBuf>) -> Self {
302 Self { root: root.into() }
303 }
304}
305
306impl Adapter for ClaudeCodeAdapter {
307 fn discover(&self) -> DiscoverFuture<'_> {
308 jsonl_tree_discover(self)
309 }
310
311 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
312 jsonl_tree_events(self, oracle)
313 }
314}
315
316impl JsonlTree for ClaudeCodeAdapter {
317 type State = FileState;
318
319 fn name(&self) -> &'static str {
320 NAME
321 }
322
323 fn root(&self) -> &Path {
324 &self.root
325 }
326
327 fn peek_session_id(&self, path: &Path, first_line: &str) -> Option<String> {
328 if subagents_dir(path).is_some() {
335 let (parent_uuid, child_suffix, _) = subagent_ids(path)?;
336 return Some(format!("{parent_uuid}/{child_suffix}"));
337 }
338 let row: Value = serde_json::from_str(first_line).ok()?;
339 row.get("sessionId")?.as_str().map(ToOwned::to_owned)
340 }
341
342 fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
343 session_from_rows(path, rows)
344 }
345
346 fn events_from_row(
347 &self,
348 session: &Session,
349 row: &BoundedRow,
350 state: &mut Self::State,
351 ) -> Result<Vec<IngestEvent>, String> {
352 if let Some(uuid) = row.value.get("uuid").and_then(Value::as_str)
353 && !state.seen_uuids.insert(uuid.to_owned())
354 {
355 return Ok(Vec::new());
356 }
357 capture_tool_call_names(&row.value, &mut state.tool_call_names);
358 events_from_row(&session.id, row.line, &row.value, session.created_at, state)
359 }
360
361 fn unsupported_reason(&self, path: &Path) -> Option<String> {
362 if subagents_dir(path).is_some() && subagent_ids(path).is_none() {
368 return Some(format!(
369 "{}: subagent transcript layout not recognized by this pond version; \
370 skipped so it is not merged into the parent session - update pond and \
371 re-run `pond sync`",
372 path.display()
373 ));
374 }
375 None
376 }
377}
378
379fn capture_tool_call_names(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
384 let Some(items) = row
385 .get("message")
386 .and_then(|message| message.get("content"))
387 .and_then(Value::as_array)
388 else {
389 return;
390 };
391 for item in items {
392 let kind = item.get("type").and_then(Value::as_str);
393 if !matches!(kind, Some("tool_use") | Some("server_tool_use")) {
394 continue;
395 }
396 let (Some(id), Some(name)) = (item.str_field("id"), extract_str(item, "name")) else {
397 continue;
398 };
399 map.insert(id.to_owned(), name);
400 }
401}
402
403fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
404 let path_display = path.display().to_string();
405 let mut created_at = None;
406 let mut project: Option<Extracted<String>> = None;
407 let mut version = None;
408 for row in rows {
409 if created_at.is_none() {
410 created_at = parse_timestamp(&row.value).ok();
411 }
412 if project.is_none() {
413 project = extract_str(&row.value, "cwd");
414 }
415 if version.is_none() {
416 version = row
417 .value
418 .get("version")
419 .and_then(Value::as_str)
420 .map(ToOwned::to_owned);
421 }
422 }
423
424 let first = rows
425 .first()
426 .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
427 let at_first = format!("{path_display}:{}", first.line);
428 let raw_session_id = first
429 .value
430 .get("sessionId")
431 .and_then(Value::as_str)
432 .ok_or_else(|| {
433 AdapterError::schema(
434 NAME,
435 at_first.clone(),
436 format!("line {} missing sessionId", first.line),
437 )
438 })?
439 .to_owned();
440 let created_at = created_at.ok_or_else(|| {
441 AdapterError::schema(NAME, at_first, "session has no parseable timestamp")
442 })?;
443
444 let subagent = subagent_descriptor(path);
457 let project_dir = source_project_dir(path, subagent.is_some());
458 let (session_id, parent_session_id, source_agent, subagent_options) = match subagent {
459 Some(SubagentDescriptor {
460 parent_uuid,
461 child_suffix,
462 agent_hash,
463 agent_type,
464 meta,
465 }) => {
466 let child_id = format!("{parent_uuid}/{child_suffix}");
467 let agent_label = agent_type
468 .as_deref()
469 .map(|t| format!("claude-code/{t}"))
470 .unwrap_or_else(|| "claude-code/subagent".to_owned());
471 let metadata = json!({
475 "hash": agent_hash,
476 "raw_session_id": raw_session_id,
477 "meta": meta,
478 });
479 (child_id, Some(parent_uuid), agent_label, Some(metadata))
480 }
481 None => (raw_session_id, None, "claude-code".to_owned(), None),
482 };
483
484 let project = match project {
485 Some(value) => value,
486 None => {
487 let decoded = path
488 .parent()
489 .and_then(|p| p.file_name())
490 .and_then(|n| n.to_str())
491 .map(|s| s.replace('-', "/"))
492 .ok_or_else(|| {
493 AdapterError::schema(
494 NAME,
495 path_display.clone(),
496 "no `cwd` field in any row and source path is not UTF-8",
497 )
498 })?;
499 extract_self_str(&Value::String(decoded)).ok_or_else(|| {
500 AdapterError::schema(
501 NAME,
502 path_display.clone(),
503 "internal: Value::String produced None from Source::as_str",
504 )
505 })?
506 }
507 };
508
509 let mut options = ProviderOptions::new();
510 options.insert(
511 "source".to_owned(),
512 json!({
513 "adapter": "claude-code",
514 "version": version,
515 "project_dir": project_dir,
516 "workspace_path": &*project,
517 }),
518 );
519 if let Some(metadata) = subagent_options {
520 options.insert("subagent".to_owned(), metadata);
521 }
522
523 Ok(Session {
524 id: session_id,
525 parent_session_id,
526 parent_message_id: None,
527 source_agent,
528 created_at,
529 project,
530 options,
531 })
532}
533
534fn source_project_dir(path: &Path, is_subagent: bool) -> Option<String> {
535 let project_dir = if is_subagent {
540 subagents_dir(path)?.parent()?.parent()
541 } else {
542 path.parent()
543 };
544 project_dir
545 .and_then(|p| p.file_name())
546 .and_then(|n| n.to_str())
547 .map(ToOwned::to_owned)
548}
549
550fn subagents_dir(path: &Path) -> Option<&Path> {
555 let mut cur = path.parent();
556 while let Some(dir) = cur {
557 if dir.file_name().and_then(|n| n.to_str()) == Some("subagents") {
558 return Some(dir);
559 }
560 cur = dir.parent();
561 }
562 None
563}
564
565struct SubagentDescriptor {
571 parent_uuid: String,
572 child_suffix: String,
573 agent_hash: String,
574 agent_type: Option<String>,
575 meta: Option<Value>,
576}
577
578fn subagent_ids(path: &Path) -> Option<(String, String, String)> {
586 let file_name = path.file_name()?.to_str()?;
587 let agent_hash = file_name
588 .strip_prefix("agent-")?
589 .strip_suffix(".jsonl")?
590 .to_owned();
591 let subagents = subagents_dir(path)?;
592 let parent_uuid = subagents.parent()?.file_name()?.to_str()?.to_owned();
593 let child_suffix = path
597 .strip_prefix(subagents)
598 .ok()?
599 .with_extension("")
600 .to_str()?
601 .replace(std::path::MAIN_SEPARATOR, "/");
602 Some((parent_uuid, child_suffix, agent_hash))
603}
604
605fn subagent_descriptor(path: &Path) -> Option<SubagentDescriptor> {
608 let (parent_uuid, child_suffix, agent_hash) = subagent_ids(path)?;
609 let meta_path = path.parent()?.join(format!("agent-{agent_hash}.meta.json"));
610 let (agent_type, meta) = match std::fs::read(&meta_path) {
611 Ok(bytes) => match serde_json::from_slice::<Value>(&bytes) {
612 Ok(value) => (
613 value
614 .get("agentType")
615 .and_then(Value::as_str)
616 .map(ToOwned::to_owned),
617 Some(value),
618 ),
619 Err(error) => {
620 tracing::debug!(
621 target: "pond::adapter::claude_code",
622 meta = %meta_path.display(),
623 %error,
624 "subagent .meta.json present but unparseable; falling back to 'claude-code/subagent'",
625 );
626 (None, None)
627 }
628 },
629 Err(error) if error.kind() == std::io::ErrorKind::NotFound => (None, None),
630 Err(error) => {
631 tracing::debug!(
632 target: "pond::adapter::claude_code",
633 meta = %meta_path.display(),
634 %error,
635 "subagent .meta.json IO error; falling back to 'claude-code/subagent'",
636 );
637 (None, None)
638 }
639 };
640
641 Some(SubagentDescriptor {
642 parent_uuid,
643 child_suffix,
644 agent_hash,
645 agent_type,
646 meta,
647 })
648}
649
650fn events_from_row(
651 session_id: &str,
652 line: usize,
653 row: &Value,
654 default_timestamp: DateTime<Utc>,
655 state: &FileState,
656) -> Result<Vec<IngestEvent>, String> {
657 let timestamp = parse_timestamp(row).unwrap_or(default_timestamp);
658 let uuid = row
659 .get("uuid")
660 .and_then(Value::as_str)
661 .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
662
663 if let Some(message_value) = row.get("message") {
664 return message_events(
665 session_id,
666 &uuid,
667 timestamp,
668 row,
669 message_value,
670 state,
671 line,
672 );
673 }
674
675 let raw_type = row.get("type").and_then(Value::as_str);
682 let content = if raw_type == Some("attachment") {
683 row.get("attachment")
684 .and_then(attachment_content)
685 .or_else(|| Some(extract_compact_repr(row)))
686 } else {
687 extract_str(row, "subtype").or_else(|| extract_str(row, "type"))
688 };
689 let message = Message::System {
690 id: uuid,
691 session_id: session_id.to_owned(),
692 timestamp,
693 content,
694 options: row_options(row, line),
695 };
696 Ok(vec![IngestEvent::Message(message)])
697}
698
699fn message_events(
700 session_id: &str,
701 uuid: &str,
702 timestamp: DateTime<Utc>,
703 row: &Value,
704 message_value: &Value,
705 state: &FileState,
706 line: usize,
707) -> Result<Vec<IngestEvent>, String> {
708 let role = message_value
709 .get("role")
710 .and_then(Value::as_str)
711 .ok_or_else(|| "message missing role".to_owned())?;
712 let content = message_value.get("content").unwrap_or(&Value::Null);
713 let mut parts = Vec::new();
714 let message = match (role, content) {
715 ("user", Value::String(text)) => {
716 let provenance = user_text_provenance(row, text);
720 parts.push(text_part(
721 session_id,
722 uuid,
723 0,
724 extract_self_str(content),
725 provenance,
726 ));
727 Message::User {
728 id: uuid.to_owned(),
729 session_id: session_id.to_owned(),
730 timestamp,
731 options: row_options(row, line),
732 }
733 }
734 ("user", Value::Array(items)) if items.iter().all(is_tool_result) => {
735 let source_tool_result = row.get("toolUseResult").cloned();
736 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
737 tool_result_part(
738 session_id,
739 uuid,
740 ordinal,
741 item,
742 source_tool_result.as_ref(),
743 state,
744 )
745 }));
746 Message::Tool {
747 id: uuid.to_owned(),
748 session_id: session_id.to_owned(),
749 timestamp,
750 options: row_options(row, line),
751 }
752 }
753 ("user", Value::Array(items)) => {
754 let provenance = user_array_provenance(row, items);
757 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
758 user_part(session_id, uuid, ordinal, item, state, provenance)
759 }));
760 Message::User {
761 id: uuid.to_owned(),
762 session_id: session_id.to_owned(),
763 timestamp,
764 options: row_options(row, line),
765 }
766 }
767 ("assistant", Value::Array(items)) => {
768 parts.extend(
769 items
770 .iter()
771 .enumerate()
772 .map(|(ordinal, item)| assistant_part(session_id, uuid, ordinal, item)),
773 );
774 Message::Assistant {
775 id: uuid.to_owned(),
776 session_id: session_id.to_owned(),
777 timestamp,
778 options: assistant_options(row, message_value, line),
779 }
780 }
781 ("system", Value::String(_)) => Message::System {
782 id: uuid.to_owned(),
783 session_id: session_id.to_owned(),
784 timestamp,
785 content: extract_self_str(content),
786 options: row_options(row, line),
787 },
788 ("system", _) => Message::System {
789 id: uuid.to_owned(),
790 session_id: session_id.to_owned(),
791 timestamp,
792 content: Some(extract_compact_repr(message_value)),
797 options: row_options(row, line),
798 },
799 (other, _) => {
800 return Err(format!("unsupported message role {other}"));
801 }
802 };
803
804 let mut events = Vec::with_capacity(parts.len() + 1);
805 events.push(IngestEvent::Message(message));
806 events.extend(parts.into_iter().map(IngestEvent::Part));
807 Ok(events)
808}
809
810fn text_part(
811 session_id: &str,
812 message_id: &str,
813 ordinal: usize,
814 text: Option<Extracted<String>>,
815 provenance: Provenance,
816) -> Part {
817 Part {
818 session_id: session_id.to_owned(),
819 id: part_id(message_id, ordinal),
820 message_id: message_id.to_owned(),
821 ordinal: part_ordinal(ordinal),
822 provenance,
823 options: empty_options(),
824 kind: PartKind::Text { text },
825 }
826}
827
828fn user_part(
829 session_id: &str,
830 message_id: &str,
831 ordinal: usize,
832 value: &Value,
833 state: &FileState,
834 provenance: Provenance,
835) -> Part {
836 match value.get("type").and_then(Value::as_str) {
837 Some("text") => text_part(
838 session_id,
839 message_id,
840 ordinal,
841 extract_str(value, "text"),
842 provenance,
843 ),
844 Some("image") | Some("file") => {
845 file_part(session_id, message_id, ordinal, value, provenance)
846 }
847 Some("tool_result") => {
848 tool_result_part(session_id, message_id, ordinal, value, None, state)
849 }
850 _ => text_part(
854 session_id,
855 message_id,
856 ordinal,
857 Some(extract_compact_repr(value)),
858 provenance,
859 ),
860 }
861}
862
863fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, value: &Value) -> Part {
864 match value.get("type").and_then(Value::as_str) {
868 Some("text") => text_part(
869 session_id,
870 message_id,
871 ordinal,
872 extract_str(value, "text"),
873 Provenance::Conversational,
874 ),
875 Some("thinking") => Part {
876 session_id: session_id.to_owned(),
877 id: part_id(message_id, ordinal),
878 message_id: message_id.to_owned(),
879 ordinal: part_ordinal(ordinal),
880 provenance: Provenance::Conversational,
881 options: signature_options(value),
882 kind: PartKind::Reasoning {
883 text: extract_str(value, "thinking"),
884 },
885 },
886 Some("tool_use") => Part {
887 session_id: session_id.to_owned(),
888 id: part_id(message_id, ordinal),
889 message_id: message_id.to_owned(),
890 ordinal: part_ordinal(ordinal),
891 provenance: Provenance::Conversational,
892 options: empty_options(),
893 kind: PartKind::ToolCall {
894 call_id: extract_str(value, "id"),
895 name: extract_str(value, "name"),
896 params: value.get("input").cloned().unwrap_or(Value::Null),
897 provider_executed: false,
898 },
899 },
900 Some("server_tool_use") => Part {
901 session_id: session_id.to_owned(),
902 id: part_id(message_id, ordinal),
903 message_id: message_id.to_owned(),
904 ordinal: part_ordinal(ordinal),
905 provenance: Provenance::Conversational,
906 options: empty_options(),
907 kind: PartKind::ToolCall {
908 call_id: extract_str(value, "id"),
909 name: extract_str(value, "name"),
910 params: value.get("input").cloned().unwrap_or(Value::Null),
911 provider_executed: true,
912 },
913 },
914 Some("image") | Some("file") => file_part(
915 session_id,
916 message_id,
917 ordinal,
918 value,
919 Provenance::Conversational,
920 ),
921 _ => text_part(
924 session_id,
925 message_id,
926 ordinal,
927 Some(extract_compact_repr(value)),
928 Provenance::Conversational,
929 ),
930 }
931}
932
933fn tool_result_part(
934 session_id: &str,
935 message_id: &str,
936 ordinal: usize,
937 value: &Value,
938 source_tool_result: Option<&Value>,
939 state: &FileState,
940) -> Part {
941 let call_id = extract_str(value, "tool_use_id");
942 let name = value
948 .str_field("tool_use_id")
949 .and_then(|id| state.tool_call_names.get(id))
950 .cloned();
951 let result = value
952 .get("content")
953 .cloned()
954 .or_else(|| source_tool_result.cloned())
955 .unwrap_or(Value::Null);
956 Part {
957 session_id: session_id.to_owned(),
958 id: part_id(message_id, ordinal),
959 message_id: message_id.to_owned(),
960 ordinal: part_ordinal(ordinal),
961 provenance: Provenance::Injected,
964 options: empty_options(),
965 kind: PartKind::ToolResult {
966 call_id,
967 name,
968 is_failure: value
969 .get("is_error")
970 .and_then(Value::as_bool)
971 .unwrap_or(false),
972 result,
973 },
974 }
975}
976
977fn file_part(
978 session_id: &str,
979 message_id: &str,
980 ordinal: usize,
981 value: &Value,
982 provenance: Provenance,
983) -> Part {
984 let media_type = value
985 .get("media_type")
986 .or_else(|| value.get("mime_type"))
987 .and_then(Value::as_str)
988 .map(ToOwned::to_owned);
989 let file_name = value
990 .get("file_name")
991 .or_else(|| value.get("name"))
992 .and_then(Value::as_str)
993 .map(ToOwned::to_owned);
994 let data = if let Some(source) = value.get("source") {
995 if let Some(url) = source.get("url").and_then(Value::as_str) {
996 FileData::Url(url.to_owned())
997 } else if let Some(bytes) = source.get("data").and_then(Value::as_str) {
998 FileData::String(bytes.to_owned())
999 } else {
1000 FileData::String(compact_json(source))
1001 }
1002 } else if let Some(url) = value.get("url").and_then(Value::as_str) {
1003 FileData::Url(url.to_owned())
1004 } else {
1005 FileData::String(compact_json(value))
1006 };
1007
1008 Part {
1009 session_id: session_id.to_owned(),
1010 id: part_id(message_id, ordinal),
1011 message_id: message_id.to_owned(),
1012 ordinal: part_ordinal(ordinal),
1013 provenance,
1014 options: empty_options(),
1015 kind: PartKind::File {
1016 media_type,
1017 file_name,
1018 data,
1019 },
1020 }
1021}
1022
1023fn row_options(row: &Value, line: usize) -> ProviderOptions {
1024 let mut options = ProviderOptions::new();
1025 let source = json!({
1026 "line": line,
1027 "parent_uuid": row.get("parentUuid"),
1028 "is_sidechain": row.get("isSidechain"),
1029 "user_type": row.get("userType"),
1030 "entrypoint": row.get("entrypoint"),
1031 "cwd": row.get("cwd"),
1032 "version": row.get("version"),
1033 "git_branch": row.get("gitBranch"),
1034 "request_id": row.get("requestId"),
1035 "raw_type": row.get("type"),
1036 "raw_record": extract_raw_record(row),
1037 });
1038 options.insert("source".to_owned(), source);
1039 options
1040}
1041
1042fn assistant_options(row: &Value, message_value: &Value, line: usize) -> ProviderOptions {
1043 let mut options = row_options(row, line);
1044 let anthropic = json!({
1045 "id": message_value.get("id"),
1046 "model": message_value.get("model"),
1047 "stop_reason": message_value.get("stop_reason"),
1048 "stop_sequence": message_value.get("stop_sequence"),
1049 "usage": message_value.get("usage"),
1050 });
1051 options.insert("anthropic".to_owned(), anthropic);
1052 options
1053}
1054
1055fn signature_options(value: &Value) -> ProviderOptions {
1056 let mut options = ProviderOptions::new();
1057 if let Some(signature) = value.get("signature").and_then(Value::as_str) {
1058 options.insert("anthropic".to_owned(), json!({"signature": signature}));
1059 }
1060 options
1061}
1062
1063fn attachment_content(value: &Value) -> Option<Extracted<String>> {
1064 extract_str(value, "content").or_else(|| extract_str(value, "stdout"))
1065}
1066
1067fn parse_timestamp(value: &Value) -> anyhow::Result<DateTime<Utc>> {
1068 let timestamp = value
1069 .get("timestamp")
1070 .and_then(Value::as_str)
1071 .context("missing timestamp")?;
1072 Ok(DateTime::parse_from_rfc3339(timestamp)
1073 .context("invalid timestamp")?
1074 .with_timezone(&Utc))
1075}
1076
1077fn is_tool_result(value: &Value) -> bool {
1078 value.get("type").and_then(Value::as_str) == Some("tool_result")
1079}
1080
1081fn is_meta_row(row: &Value) -> bool {
1084 row.get("isMeta").and_then(Value::as_bool) == Some(true)
1085}
1086
1087fn is_injected_user_text(text: &str) -> bool {
1091 let trimmed = text.trim_start();
1092 trimmed.starts_with("<task-notification>")
1093 || trimmed.starts_with("<command-name>")
1094 || trimmed.starts_with("<command-message>")
1095 || trimmed.starts_with("<command-args>")
1096 || trimmed.starts_with("<local-command-caveat>")
1097 || trimmed.starts_with("<local-command-stdout>")
1098 || trimmed.starts_with("[Request interrupted by user")
1099}
1100
1101fn user_text_provenance(row: &Value, text: &str) -> Provenance {
1104 if is_meta_row(row) || is_injected_user_text(text) {
1105 Provenance::Injected
1106 } else {
1107 Provenance::Conversational
1108 }
1109}
1110
1111fn user_array_provenance(row: &Value, items: &[Value]) -> Provenance {
1115 if is_meta_row(row) {
1116 return Provenance::Injected;
1117 }
1118 let wrapped = items.iter().any(|item| {
1119 item.get("type").and_then(Value::as_str) == Some("text")
1120 && item
1121 .get("text")
1122 .and_then(Value::as_str)
1123 .is_some_and(is_injected_user_text)
1124 });
1125 if wrapped {
1126 Provenance::Injected
1127 } else {
1128 Provenance::Conversational
1129 }
1130}
1131
1132#[cfg(test)]
1133mod tests {
1134 #![allow(clippy::expect_used, clippy::unwrap_used)]
1142
1143 use super::*;
1144 use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
1145 use tempfile::TempDir;
1146
1147 const FIXTURE_ROOT: &str = "tests/fixtures/adapter/claude_code/projects";
1148
1149 #[test]
1150 fn probe_default_finds_claude_projects_under_home() -> anyhow::Result<()> {
1151 crate::adapter::test_support::assert_probe_default(
1152 &ClaudeCodeFactory,
1153 &[".claude", "projects"],
1154 )
1155 }
1156
1157 #[tokio::test(flavor = "multi_thread")]
1158 async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
1159 let adapter = ClaudeCodeAdapter::new(FIXTURE_ROOT);
1160 crate::adapter::test_support::assert_native_restore(
1161 &ClaudeCodeFactory,
1162 &adapter,
1163 std::path::Path::new(FIXTURE_ROOT),
1164 )
1165 .await
1166 }
1167
1168 #[tokio::test(flavor = "multi_thread")]
1176 async fn subagent_file_derives_child_session_with_parent_link() -> anyhow::Result<()> {
1177 let corpus = TempDir::new()?;
1178 let project_dir = corpus.path().join("-tmp-pond-test");
1179 let parent_uuid = "11111111-1111-1111-1111-111111111111";
1180 let agent_hash = "abc123def456";
1181 std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1182
1183 let parent_row = serde_json::json!({
1185 "type": "user",
1186 "uuid": "u-parent-1",
1187 "sessionId": parent_uuid,
1188 "cwd": "/tmp/pond-test",
1189 "timestamp": "2026-05-16T00:00:00.000Z",
1190 "version": "2.1.121",
1191 "message": {"role": "user", "content": "hi parent"},
1192 });
1193 std::fs::write(
1194 project_dir.join(format!("{parent_uuid}.jsonl")),
1195 format!("{parent_row}\n"),
1196 )?;
1197
1198 let subagent_row = serde_json::json!({
1201 "type": "user",
1202 "uuid": "u-sub-1",
1203 "sessionId": parent_uuid,
1204 "cwd": "/tmp/pond-test",
1205 "isSidechain": true,
1206 "agentId": agent_hash,
1207 "timestamp": "2026-05-16T00:01:00.000Z",
1208 "version": "2.1.121",
1209 "message": {"role": "user", "content": "subagent prompt"},
1210 });
1211 std::fs::write(
1212 project_dir
1213 .join(parent_uuid)
1214 .join("subagents")
1215 .join(format!("agent-{agent_hash}.jsonl")),
1216 format!("{subagent_row}\n"),
1217 )?;
1218 std::fs::write(
1219 project_dir
1220 .join(parent_uuid)
1221 .join("subagents")
1222 .join(format!("agent-{agent_hash}.meta.json")),
1223 r#"{"agentType":"general-purpose","description":"do a thing"}"#,
1224 )?;
1225
1226 let store_dir = TempDir::new()?;
1227 let store = Store::open_local(store_dir.path()).await?;
1228 let adapter = ClaudeCodeAdapter::new(corpus.path());
1229
1230 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1231 assert_eq!(
1232 summary.dropped_sessions, 0,
1233 "subagent file must NOT collide with parent (pre-fix this was the project-immutable rejection)"
1234 );
1235
1236 let parent = store
1237 .get_session(parent_uuid)
1238 .await?
1239 .expect("parent session should ingest as the bare uuid");
1240 assert_eq!(parent.session.source_agent, "claude-code");
1241 assert_eq!(parent.session.parent_session_id, None);
1242
1243 let child_id = format!("{parent_uuid}/agent-{agent_hash}");
1244 let child = store
1245 .get_session(&child_id)
1246 .await?
1247 .expect("subagent session must surface under the derived id");
1248 assert_eq!(
1249 child.session.source_agent, "claude-code/general-purpose",
1250 "agent_type from .meta.json should suffix the source_agent label"
1251 );
1252 assert_eq!(
1253 child.session.parent_session_id.as_deref(),
1254 Some(parent_uuid),
1255 "subagent must link back to parent via parent_session_id",
1256 );
1257 let subagent_meta = child
1258 .session
1259 .options
1260 .get("subagent")
1261 .expect("options.subagent must carry the hash + verbatim meta.json");
1262 assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1263 assert_eq!(
1264 subagent_meta["meta"]["agentType"],
1265 serde_json::json!("general-purpose")
1266 );
1267 assert_eq!(
1268 subagent_meta["meta"]["description"],
1269 serde_json::json!("do a thing")
1270 );
1271 Ok(())
1272 }
1273
1274 #[tokio::test(flavor = "multi_thread")]
1278 async fn subagent_without_meta_falls_back_to_generic_label() -> anyhow::Result<()> {
1279 let corpus = TempDir::new()?;
1280 let project_dir = corpus.path().join("-tmp-pond-test");
1281 let parent_uuid = "22222222-2222-2222-2222-222222222222";
1282 let agent_hash = "deadbeef";
1283 std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1284 let row = serde_json::json!({
1285 "type": "user",
1286 "uuid": "u-sub-only",
1287 "sessionId": parent_uuid,
1288 "cwd": "/tmp/pond-test",
1289 "timestamp": "2026-05-16T00:00:00.000Z",
1290 "message": {"role": "user", "content": "no meta sibling here"},
1291 });
1292 std::fs::write(
1293 project_dir
1294 .join(parent_uuid)
1295 .join("subagents")
1296 .join(format!("agent-{agent_hash}.jsonl")),
1297 format!("{row}\n"),
1298 )?;
1299
1300 let store_dir = TempDir::new()?;
1301 let store = Store::open_local(store_dir.path()).await?;
1302 let adapter = ClaudeCodeAdapter::new(corpus.path());
1303 let _summary =
1304 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1305
1306 let child = store
1307 .get_session(&format!("{parent_uuid}/agent-{agent_hash}"))
1308 .await?
1309 .expect("derived child id even without meta");
1310 assert_eq!(child.session.source_agent, "claude-code/subagent");
1311 Ok(())
1312 }
1313
1314 #[tokio::test(flavor = "multi_thread")]
1322 async fn workflow_nested_subagent_derives_distinct_child_not_parent_collision()
1323 -> anyhow::Result<()> {
1324 let corpus = TempDir::new()?;
1325 let project_dir = corpus.path().join("-tmp-pond-test");
1326 let parent_uuid = "44444444-4444-4444-4444-444444444444";
1327 let wf_id = "wf_abcd1234-ef0";
1328 let agent_hash = "cafef00dbaadf00d1";
1329 let wf_dir = project_dir
1330 .join(parent_uuid)
1331 .join("subagents")
1332 .join("workflows")
1333 .join(wf_id);
1334 std::fs::create_dir_all(&wf_dir)?;
1335
1336 let parent_row = serde_json::json!({
1337 "type": "user",
1338 "uuid": "u-parent-1",
1339 "sessionId": parent_uuid,
1340 "cwd": "/tmp/pond-test",
1341 "timestamp": "2026-05-20T00:00:00.000Z",
1342 "message": {"role": "user", "content": "hi parent"},
1343 });
1344 std::fs::write(
1345 project_dir.join(format!("{parent_uuid}.jsonl")),
1346 format!("{parent_row}\n"),
1347 )?;
1348
1349 let subagent_row = serde_json::json!({
1351 "type": "user",
1352 "uuid": "u-wf-sub-1",
1353 "sessionId": parent_uuid,
1354 "cwd": "/tmp/pond-test/packages/sub",
1355 "isSidechain": true,
1356 "agentId": agent_hash,
1357 "timestamp": "2026-05-20T00:01:00.000Z",
1358 "message": {"role": "user", "content": "workflow subagent prompt"},
1359 });
1360 std::fs::write(
1361 wf_dir.join(format!("agent-{agent_hash}.jsonl")),
1362 format!("{subagent_row}\n"),
1363 )?;
1364 std::fs::write(
1365 wf_dir.join(format!("agent-{agent_hash}.meta.json")),
1366 r#"{"agentType":"general-purpose","description":"workflow child"}"#,
1367 )?;
1368
1369 let store_dir = TempDir::new()?;
1370 let store = Store::open_local(store_dir.path()).await?;
1371 let adapter = ClaudeCodeAdapter::new(corpus.path());
1372 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1373 assert_eq!(
1374 summary.dropped_sessions, 0,
1375 "nested workflow subagent must NOT collide with the parent project",
1376 );
1377
1378 let parent = store
1379 .get_session(parent_uuid)
1380 .await?
1381 .expect("parent session ingests under the bare uuid");
1382 assert_eq!(&*parent.session.project, "/tmp/pond-test");
1383 assert_eq!(parent.session.parent_session_id, None);
1384
1385 let child_id = format!("{parent_uuid}/workflows/{wf_id}/agent-{agent_hash}");
1386 let child = store
1387 .get_session(&child_id)
1388 .await?
1389 .expect("workflow subagent surfaces under the full nested child id");
1390 assert_eq!(child.session.source_agent, "claude-code/general-purpose");
1391 assert_eq!(
1392 child.session.parent_session_id.as_deref(),
1393 Some(parent_uuid)
1394 );
1395 assert_eq!(
1396 &*child.session.project, "/tmp/pond-test/packages/sub",
1397 "child keeps its own cwd-derived project, distinct from the parent",
1398 );
1399 let subagent_meta = child
1400 .session
1401 .options
1402 .get("subagent")
1403 .expect("options.subagent present");
1404 assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1405 Ok(())
1406 }
1407
1408 #[tokio::test(flavor = "multi_thread")]
1414 async fn unrecognized_subagents_file_fails_visibly_not_merged() -> anyhow::Result<()> {
1415 let corpus = TempDir::new()?;
1416 let project_dir = corpus.path().join("-tmp-pond-test");
1417 let parent_uuid = "55555555-5555-5555-5555-555555555555";
1418 let unknown_dir = project_dir
1419 .join(parent_uuid)
1420 .join("subagents")
1421 .join("workflows")
1422 .join("wf_future01-aaa");
1423 std::fs::create_dir_all(&unknown_dir)?;
1424
1425 let parent_row = serde_json::json!({
1426 "type": "user",
1427 "uuid": "u-parent-only",
1428 "sessionId": parent_uuid,
1429 "cwd": "/tmp/pond-test",
1430 "timestamp": "2026-05-20T00:00:00.000Z",
1431 "message": {"role": "user", "content": "parent message"},
1432 });
1433 std::fs::write(
1434 project_dir.join(format!("{parent_uuid}.jsonl")),
1435 format!("{parent_row}\n"),
1436 )?;
1437
1438 let unknown_row = serde_json::json!({
1441 "type": "user",
1442 "uuid": "u-should-not-merge",
1443 "sessionId": parent_uuid,
1444 "cwd": "/tmp/pond-test",
1445 "timestamp": "2026-05-20T00:02:00.000Z",
1446 "message": {"role": "user", "content": "must not land under parent"},
1447 });
1448 std::fs::write(
1449 unknown_dir.join("transcript-001.jsonl"),
1450 format!("{unknown_row}\n"),
1451 )?;
1452
1453 let store_dir = TempDir::new()?;
1454 let store = Store::open_local(store_dir.path()).await?;
1455 let adapter = ClaudeCodeAdapter::new(corpus.path());
1456 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1457
1458 assert_eq!(
1459 summary.skipped_files, 1,
1460 "the unrecognized subagents/ transcript must be a visible, counted skip",
1461 );
1462 let parent = store
1463 .get_session(parent_uuid)
1464 .await?
1465 .expect("parent session ingests");
1466 assert_eq!(
1467 parent.messages.len(),
1468 1,
1469 "the unrecognized file's row must NOT be merged into the parent session",
1470 );
1471 assert!(
1472 parent
1473 .messages
1474 .iter()
1475 .all(|m| m.message.id() != "u-should-not-merge"),
1476 "parent must not absorb the unrecognized file's message",
1477 );
1478 Ok(())
1479 }
1480
1481 #[tokio::test(flavor = "multi_thread")]
1489 async fn unrecognized_subagents_file_stays_visible_under_parent_watermark() -> anyhow::Result<()>
1490 {
1491 struct ParentAlreadyFresh;
1492 impl crate::adapter::SkipOracle for ParentAlreadyFresh {
1493 fn last_ingested_at(&self, _session_id: &str) -> Option<DateTime<Utc>> {
1494 Some(
1498 DateTime::parse_from_rfc3339("2999-01-01T00:00:00Z")
1499 .unwrap()
1500 .with_timezone(&Utc),
1501 )
1502 }
1503 fn is_empty(&self) -> bool {
1504 false
1505 }
1506 }
1507
1508 let corpus = TempDir::new()?;
1509 let project_dir = corpus.path().join("-tmp-pond-test");
1510 let parent_uuid = "66666666-6666-6666-6666-666666666666";
1511 let unknown_dir = project_dir
1512 .join(parent_uuid)
1513 .join("subagents")
1514 .join("workflows")
1515 .join("wf_future02-bbb");
1516 std::fs::create_dir_all(&unknown_dir)?;
1517
1518 let parent_row = serde_json::json!({
1519 "type": "user",
1520 "uuid": "u-parent-fresh",
1521 "sessionId": parent_uuid,
1522 "cwd": "/tmp/pond-test",
1523 "timestamp": "2026-05-20T00:00:00.000Z",
1524 "message": {"role": "user", "content": "parent message"},
1525 });
1526 std::fs::write(
1527 project_dir.join(format!("{parent_uuid}.jsonl")),
1528 format!("{parent_row}\n"),
1529 )?;
1530
1531 let unknown_row = serde_json::json!({
1534 "type": "user",
1535 "uuid": "u-resync-should-stay-visible",
1536 "sessionId": parent_uuid,
1537 "cwd": "/tmp/pond-test",
1538 "timestamp": "2026-05-20T00:02:00.000Z",
1539 "message": {"role": "user", "content": "must stay visible"},
1540 });
1541 std::fs::write(
1542 unknown_dir.join("transcript-002.jsonl"),
1543 format!("{unknown_row}\n"),
1544 )?;
1545
1546 let store_dir = TempDir::new()?;
1547 let store = Store::open_local(store_dir.path()).await?;
1548 let adapter = ClaudeCodeAdapter::new(corpus.path());
1549 let summary = ingest_adapter(&store, &adapter, &ParentAlreadyFresh, |_| {}).await?;
1550
1551 assert_eq!(
1552 summary.skipped_files, 1,
1553 "the unrecognized transcript must stay a visible Unsupported skip, not be fresh-skipped under the parent's watermark",
1554 );
1555 assert_eq!(
1558 summary.skipped_fresh, 1,
1559 "only the parent may fresh-skip; the unrecognized file must not borrow its watermark",
1560 );
1561 Ok(())
1562 }
1563
1564 #[tokio::test(flavor = "multi_thread")]
1569 async fn replay_duplicates_are_dedup_at_adapter_layer() -> anyhow::Result<()> {
1570 let corpus = TempDir::new()?;
1571 let project_dir = corpus.path().join("-tmp-pond-test");
1572 std::fs::create_dir_all(&project_dir)?;
1573 let session_uuid = "33333333-3333-3333-3333-333333333333";
1574 let dup_uuid = "u-shared-1";
1575 let row = serde_json::json!({
1576 "type": "user",
1577 "uuid": dup_uuid,
1578 "sessionId": session_uuid,
1579 "cwd": "/tmp/pond-test",
1580 "timestamp": "2026-05-16T00:00:00.000Z",
1581 "message": {"role": "user", "content": "replayed three times"},
1582 });
1583 let body = format!("{row}\n{row}\n{row}\n");
1585 std::fs::write(project_dir.join(format!("{session_uuid}.jsonl")), body)?;
1586
1587 let store_dir = TempDir::new()?;
1588 let store = Store::open_local(store_dir.path()).await?;
1589 let adapter = ClaudeCodeAdapter::new(corpus.path());
1590 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1591
1592 assert_eq!(
1593 summary.dropped_events, 0,
1594 "adapter must dedupe replays before they reach the validator"
1595 );
1596 assert!(
1597 !summary
1598 .drop_reasons
1599 .contains_key(crate::sessions::DROP_REASON_DUPLICATE_MESSAGE_ID),
1600 "duplicate_message_id bucket stays empty when adapter does its job"
1601 );
1602 Ok(())
1603 }
1604
1605 #[tokio::test(flavor = "multi_thread")]
1609 async fn tool_result_name_resolves_from_prior_tool_use_in_same_file() -> anyhow::Result<()> {
1610 let corpus = TempDir::new()?;
1611 let project_dir = corpus.path().join("-tmp-pond-test");
1612 std::fs::create_dir_all(&project_dir)?;
1613 let session_uuid = "44444444-4444-4444-4444-444444444444";
1614 let call_id = "toolu_test_01";
1615
1616 let tool_use_row = serde_json::json!({
1617 "type": "assistant",
1618 "uuid": "u-call",
1619 "sessionId": session_uuid,
1620 "cwd": "/tmp/pond-test",
1621 "timestamp": "2026-05-16T00:00:00.000Z",
1622 "message": {
1623 "role": "assistant",
1624 "content": [{
1625 "type": "tool_use",
1626 "id": call_id,
1627 "name": "Edit",
1628 "input": {"file_path": "/tmp/foo"},
1629 }],
1630 },
1631 });
1632 let tool_result_row = serde_json::json!({
1633 "type": "user",
1634 "uuid": "u-result",
1635 "sessionId": session_uuid,
1636 "cwd": "/tmp/pond-test",
1637 "timestamp": "2026-05-16T00:00:01.000Z",
1638 "message": {
1639 "role": "user",
1640 "content": [{
1641 "type": "tool_result",
1642 "tool_use_id": call_id,
1643 "content": "ok",
1644 }],
1645 },
1646 });
1647 std::fs::write(
1648 project_dir.join(format!("{session_uuid}.jsonl")),
1649 format!("{tool_use_row}\n{tool_result_row}\n"),
1650 )?;
1651
1652 let store_dir = TempDir::new()?;
1653 let store = Store::open_local(store_dir.path()).await?;
1654 let adapter = ClaudeCodeAdapter::new(corpus.path());
1655 let _summary =
1656 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1657 let session = store
1658 .get_session(session_uuid)
1659 .await?
1660 .expect("session ingests");
1661
1662 let mut saw_call = false;
1663 let mut saw_result = false;
1664 for stored in &session.messages {
1665 for part in &stored.parts {
1666 match &part.kind {
1667 PartKind::ToolCall {
1668 call_id: cid, name, ..
1669 } => {
1670 assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1671 assert_eq!(
1672 name.as_ref().map(|e| e.as_str()),
1673 Some("Edit"),
1674 "tool_use carries the name directly"
1675 );
1676 saw_call = true;
1677 }
1678 PartKind::ToolResult {
1679 call_id: cid, name, ..
1680 } => {
1681 assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1682 assert_eq!(
1683 name.as_ref().map(|e| e.as_str()),
1684 Some("Edit"),
1685 "tool_result resolves the name via the per-file map (was 'unknown' pre-2026-05-16)"
1686 );
1687 saw_result = true;
1688 }
1689 _ => {}
1690 }
1691 }
1692 }
1693 assert!(saw_call && saw_result, "both parts must be present");
1694 Ok(())
1695 }
1696
1697 #[test]
1701 fn user_text_provenance_separates_prompts_from_harness_injection() {
1702 let prompt = json!({"type": "user", "uuid": "u1"});
1703 assert_eq!(
1704 user_text_provenance(&prompt, "please refactor the parser"),
1705 Provenance::Conversational,
1706 );
1707
1708 let notification = json!({"type": "user", "uuid": "u2"});
1709 assert_eq!(
1710 user_text_provenance(
1711 ¬ification,
1712 "<task-notification>background task done</task-notification>",
1713 ),
1714 Provenance::Injected,
1715 );
1716
1717 let meta = json!({"type": "user", "uuid": "u3", "isMeta": true});
1718 assert_eq!(
1719 user_text_provenance(&meta, "expanded skill body"),
1720 Provenance::Injected,
1721 );
1722 }
1723
1724 #[tokio::test(flavor = "multi_thread")]
1728 async fn task_notification_message_yields_injected_parts() -> anyhow::Result<()> {
1729 let corpus = TempDir::new()?;
1730 let project_dir = corpus.path().join("-tmp-pond-test");
1731 std::fs::create_dir_all(&project_dir)?;
1732 let session_uuid = "66666666-6666-6666-6666-666666666666";
1733 let prompt = serde_json::json!({
1734 "type": "user",
1735 "uuid": "u-prompt",
1736 "sessionId": session_uuid,
1737 "cwd": "/tmp/pond-test",
1738 "timestamp": "2026-05-16T00:00:00.000Z",
1739 "message": {"role": "user", "content": "genuine human prompt"},
1740 });
1741 let notification = serde_json::json!({
1742 "type": "user",
1743 "uuid": "u-notify",
1744 "sessionId": session_uuid,
1745 "cwd": "/tmp/pond-test",
1746 "timestamp": "2026-05-16T00:00:01.000Z",
1747 "message": {
1748 "role": "user",
1749 "content": "<task-notification>a background task finished</task-notification>",
1750 },
1751 });
1752 std::fs::write(
1753 project_dir.join(format!("{session_uuid}.jsonl")),
1754 format!("{prompt}\n{notification}\n"),
1755 )?;
1756
1757 let store_dir = TempDir::new()?;
1758 let store = Store::open_local(store_dir.path()).await?;
1759 let adapter = ClaudeCodeAdapter::new(corpus.path());
1760 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1761
1762 let session = store
1763 .get_session(session_uuid)
1764 .await?
1765 .expect("session ingests");
1766 let mut saw_prompt = false;
1767 let mut saw_notification = false;
1768 for stored in &session.messages {
1769 for part in &stored.parts {
1770 if stored.message.id() == "u-prompt" {
1771 assert_eq!(part.provenance, crate::wire::Provenance::Conversational);
1772 saw_prompt = true;
1773 }
1774 if stored.message.id() == "u-notify" {
1775 assert_eq!(part.provenance, crate::wire::Provenance::Injected);
1776 saw_notification = true;
1777 }
1778 }
1779 }
1780 assert!(saw_prompt && saw_notification, "both messages present");
1781 Ok(())
1782 }
1783
1784 #[tokio::test(flavor = "multi_thread")]
1788 async fn orphan_tool_result_yields_name_none_not_unknown_sentinel() -> anyhow::Result<()> {
1789 let corpus = TempDir::new()?;
1790 let project_dir = corpus.path().join("-tmp-pond-test");
1791 std::fs::create_dir_all(&project_dir)?;
1792 let session_uuid = "55555555-5555-5555-5555-555555555555";
1793
1794 let row = serde_json::json!({
1796 "type": "user",
1797 "uuid": "u-orphan",
1798 "sessionId": session_uuid,
1799 "cwd": "/tmp/pond-test",
1800 "timestamp": "2026-05-16T00:00:00.000Z",
1801 "message": {
1802 "role": "user",
1803 "content": [{
1804 "type": "tool_result",
1805 "tool_use_id": "toolu_orphan",
1806 "content": "result body, no matching call",
1807 }],
1808 },
1809 });
1810 std::fs::write(
1811 project_dir.join(format!("{session_uuid}.jsonl")),
1812 format!("{row}\n"),
1813 )?;
1814
1815 let store_dir = TempDir::new()?;
1816 let store = Store::open_local(store_dir.path()).await?;
1817 let adapter = ClaudeCodeAdapter::new(corpus.path());
1818 let _summary =
1819 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1820 let session = store
1821 .get_session(session_uuid)
1822 .await?
1823 .expect("session ingests");
1824 let mut found = false;
1825 for stored in &session.messages {
1826 for part in &stored.parts {
1827 if let PartKind::ToolResult { name, call_id, .. } = &part.kind {
1828 assert_eq!(call_id.as_ref().map(|e| e.as_str()), Some("toolu_orphan"));
1829 assert!(
1830 name.is_none(),
1831 "orphan tool_result must be name=None, not synthesized 'unknown'",
1832 );
1833 found = true;
1834 }
1835 }
1836 }
1837 assert!(found, "orphan tool_result part must be present");
1838 Ok(())
1840 }
1841}