1use std::{
10 collections::{HashMap, HashSet},
11 path::{Path, PathBuf},
12};
13
14use anyhow::Context as _;
15use chrono::{DateTime, SecondsFormat, Utc};
16use serde_json::{Value, json};
17
18use crate::{
19 sessions::IngestEvent,
20 wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
21};
22
23use super::{
24 Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
25 RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
26 empty_options,
27 extract::{
28 Extracted, Source, extract_compact_repr, extract_raw_record, extract_self_str, extract_str,
29 },
30 extracted_text,
31 jsonl::{BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events},
32 jsonl_bytes, part_id, raw_record,
33};
34
35#[derive(Debug, Default)]
56pub(crate) struct FileState {
57 seen_uuids: HashSet<String>,
58 tool_call_names: HashMap<String, Extracted<String>>,
59}
60
61const NAME: &str = "claude-code";
65
66pub struct ClaudeCodeFactory;
69
70impl AdapterFactory for ClaudeCodeFactory {
71 fn name(&self) -> &'static str {
72 NAME
73 }
74
75 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
76 Ok(Box::new(ClaudeCodeAdapter::new(config_path(NAME, config)?)))
77 }
78
79 fn probe_default(&self, env: &Env) -> Option<Value> {
80 let path = env.home.join(".claude").join("projects");
81 path.exists().then(|| json!({ "path": path }))
82 }
83
84 fn serialize(
85 &self,
86 session: &crate::sessions::SessionWithMessages,
87 fidelity: RestoreFidelity,
88 ) -> Result<Vec<RestoredFile>, AdapterError> {
89 serialize_session(session, fidelity)
90 }
91}
92
93fn serialize_session(
94 session: &crate::sessions::SessionWithMessages,
95 fidelity: RestoreFidelity,
96) -> Result<Vec<RestoredFile>, AdapterError> {
97 let mut messages = session.messages.clone();
98 if fidelity == RestoreFidelity::Native {
99 messages.sort_by(|left, right| {
100 source_line(left.message.options())
101 .cmp(&source_line(right.message.options()))
102 .then_with(|| by_timestamp_then_id(left, right))
103 });
104 } else {
105 messages.sort_by(by_timestamp_then_id);
106 }
107 let mut records = Vec::with_capacity(messages.len());
111 let mut parent_uuid = None::<String>;
112 for message in &messages {
113 if fidelity == RestoreFidelity::Native
114 && let Some(raw) = raw_record(message.message.options())
115 {
116 parent_uuid = raw
117 .get("uuid")
118 .and_then(Value::as_str)
119 .map(ToOwned::to_owned)
120 .or(parent_uuid);
121 records.push(raw);
122 continue;
123 }
124 let Some(record) = claude_record(session, message, parent_uuid.as_deref()) else {
127 continue;
128 };
129 parent_uuid = record
130 .get("uuid")
131 .and_then(Value::as_str)
132 .map(ToOwned::to_owned);
133 records.push(record);
134 }
135
136 let mut files = vec![RestoredFile {
137 relative_path: claude_relative_path(session),
138 bytes: jsonl_bytes(NAME, &records)?,
139 }];
140 if session.session.parent_session_id.is_some()
141 && let Some(meta) = subagent_meta_record(session)
142 {
143 let mut meta_path = files[0].relative_path.clone();
144 meta_path.set_extension("meta.json");
145 files.push(RestoredFile {
146 relative_path: meta_path,
147 bytes: serde_json::to_vec(&meta).map_err(|err| {
148 AdapterError::schema(
149 NAME,
150 &session.session.id,
151 format!("json encode failed: {err}"),
152 )
153 })?,
154 });
155 }
156 Ok(files)
157}
158
159fn claude_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
160 let encoded_project = session
161 .session
162 .options
163 .get("source")
164 .and_then(|source| source.get("project_dir"))
165 .and_then(Value::as_str)
166 .map(ToOwned::to_owned)
167 .unwrap_or_else(|| encode_project(&session.session.project));
168 if let Some(parent) = &session.session.parent_session_id {
169 let agent = session
170 .session
171 .id
172 .rsplit('/')
173 .next()
174 .unwrap_or(&session.session.id);
175 return PathBuf::from(encoded_project)
176 .join(parent)
177 .join("subagents")
178 .join(format!("{agent}.jsonl"));
179 }
180 PathBuf::from(encoded_project).join(format!("{}.jsonl", session.session.id))
181}
182
183fn source_line(options: &ProviderOptions) -> Option<u64> {
184 options
185 .get("source")
186 .and_then(|source| source.get("line"))
187 .and_then(Value::as_u64)
188}
189
190fn encode_project(project: &str) -> String {
191 project.replace(['/', '.'], "-")
192}
193
194fn subagent_meta_record(session: &crate::sessions::SessionWithMessages) -> Option<Value> {
195 let meta = session.session.options.get("subagent")?.get("meta")?;
199 meta.is_object().then(|| meta.clone())
200}
201
202fn claude_record(
203 session: &crate::sessions::SessionWithMessages,
204 message: &crate::sessions::MessageWithParts,
205 parent_uuid: Option<&str>,
206) -> Option<Value> {
207 let row_role = match &message.message {
215 Message::System { .. } => return None,
216 Message::User { .. } | Message::Tool { .. } => "user",
217 Message::Assistant { .. } => "assistant",
218 };
219 let mut envelope = serde_json::Map::new();
220 envelope.insert("role".to_owned(), Value::String(row_role.to_owned()));
221 if row_role == "assistant" {
222 envelope.insert("type".to_owned(), Value::String("message".to_owned()));
225 }
226 envelope.insert(
227 "content".to_owned(),
228 Value::Array(message.parts.iter().map(claude_part).collect()),
229 );
230 Some(json!({
231 "parentUuid": parent_uuid,
232 "isSidechain": false,
233 "userType": "external",
234 "cwd": &*session.session.project,
235 "sessionId": &session.session.id,
236 "type": row_role,
237 "message": Value::Object(envelope),
238 "uuid": message.message.id(),
239 "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
240 }))
241}
242
243fn claude_part(part: &Part) -> Value {
244 match &part.kind {
245 PartKind::Text { text } => json!({"type": "text", "text": extracted_text(text)}),
246 PartKind::Reasoning { text } => {
247 json!({"type": "thinking", "thinking": extracted_text(text)})
248 }
249 PartKind::ToolCall {
250 call_id,
251 name,
252 params,
253 provider_executed,
254 } => json!({
255 "type": if *provider_executed { "server_tool_use" } else { "tool_use" },
256 "id": extracted_text(call_id),
257 "name": extracted_text(name),
258 "input": params,
259 }),
260 PartKind::ToolResult {
261 call_id,
262 is_failure,
263 result,
264 ..
265 } => json!({
266 "type": "tool_result",
267 "tool_use_id": extracted_text(call_id),
268 "is_error": is_failure,
269 "content": result,
270 }),
271 PartKind::File {
272 media_type,
273 file_name,
274 data,
275 } => json!({
276 "type": "file",
277 "media_type": media_type,
278 "file_name": file_name,
279 "source": file_source(data),
280 }),
281 other => {
282 json!({"type": "text", "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null))})
283 }
284 }
285}
286
287fn file_source(data: &FileData) -> Value {
288 match data {
289 FileData::String(value) => json!({"type": "text", "data": value}),
290 FileData::Bytes(value) => json!({"type": "base64", "data": value}),
291 FileData::Url(value) => json!({"type": "url", "url": value}),
292 }
293}
294
295#[derive(Debug, Clone)]
298pub struct ClaudeCodeAdapter {
299 root: PathBuf,
300}
301
302impl ClaudeCodeAdapter {
303 pub fn new(root: impl Into<PathBuf>) -> Self {
304 Self { root: root.into() }
305 }
306}
307
308impl Adapter for ClaudeCodeAdapter {
309 fn discover(&self) -> DiscoverFuture<'_> {
310 jsonl_tree_discover(self)
311 }
312
313 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
314 jsonl_tree_events(self, oracle)
315 }
316}
317
318impl JsonlTree for ClaudeCodeAdapter {
319 type State = FileState;
320
321 fn name(&self) -> &'static str {
322 NAME
323 }
324
325 fn root(&self) -> &Path {
326 &self.root
327 }
328
329 fn peek_session_id(&self, path: &Path, first_line: &str) -> Option<String> {
330 if let Some((parent_uuid, agent_hash)) = subagent_ids(path) {
331 return Some(format!("{parent_uuid}/agent-{agent_hash}"));
332 }
333 let row: Value = serde_json::from_str(first_line).ok()?;
334 row.get("sessionId")?.as_str().map(ToOwned::to_owned)
335 }
336
337 fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
338 session_from_rows(path, rows)
339 }
340
341 fn events_from_row(
342 &self,
343 session: &Session,
344 row: &BoundedRow,
345 state: &mut Self::State,
346 ) -> Result<Vec<IngestEvent>, String> {
347 if let Some(uuid) = row.value.get("uuid").and_then(Value::as_str)
348 && !state.seen_uuids.insert(uuid.to_owned())
349 {
350 return Ok(Vec::new());
351 }
352 capture_tool_call_names(&row.value, &mut state.tool_call_names);
353 events_from_row(&session.id, row.line, &row.value, session.created_at, state)
354 }
355}
356
357fn capture_tool_call_names(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
362 let Some(items) = row
363 .get("message")
364 .and_then(|message| message.get("content"))
365 .and_then(Value::as_array)
366 else {
367 return;
368 };
369 for item in items {
370 let kind = item.get("type").and_then(Value::as_str);
371 if !matches!(kind, Some("tool_use") | Some("server_tool_use")) {
372 continue;
373 }
374 let (Some(id), Some(name)) = (item.str_field("id"), extract_str(item, "name")) else {
375 continue;
376 };
377 map.insert(id.to_owned(), name);
378 }
379}
380
381fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
382 let path_display = path.display().to_string();
383 let mut created_at = None;
384 let mut project: Option<Extracted<String>> = None;
385 let mut version = None;
386 for row in rows {
387 if created_at.is_none() {
388 created_at = parse_timestamp(&row.value).ok();
389 }
390 if project.is_none() {
391 project = extract_str(&row.value, "cwd");
392 }
393 if version.is_none() {
394 version = row
395 .value
396 .get("version")
397 .and_then(Value::as_str)
398 .map(ToOwned::to_owned);
399 }
400 }
401
402 let first = rows
403 .first()
404 .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
405 let at_first = format!("{path_display}:{}", first.line);
406 let raw_session_id = first
407 .value
408 .get("sessionId")
409 .and_then(Value::as_str)
410 .ok_or_else(|| {
411 AdapterError::schema(
412 NAME,
413 at_first.clone(),
414 format!("line {} missing sessionId", first.line),
415 )
416 })?
417 .to_owned();
418 let created_at = created_at.ok_or_else(|| {
419 AdapterError::schema(NAME, at_first, "session has no parseable timestamp")
420 })?;
421
422 let subagent = subagent_descriptor(path);
430 let project_dir = source_project_dir(path, subagent.is_some());
431 let (session_id, parent_session_id, source_agent, subagent_options) = match subagent {
432 Some(SubagentDescriptor {
433 parent_uuid,
434 agent_hash,
435 agent_type,
436 meta,
437 }) => {
438 let child_id = format!("{parent_uuid}/agent-{agent_hash}");
439 let agent_label = agent_type
440 .as_deref()
441 .map(|t| format!("claude-code/{t}"))
442 .unwrap_or_else(|| "claude-code/subagent".to_owned());
443 let metadata = json!({
447 "hash": agent_hash,
448 "raw_session_id": raw_session_id,
449 "meta": meta,
450 });
451 (child_id, Some(parent_uuid), agent_label, Some(metadata))
452 }
453 None => (raw_session_id, None, "claude-code".to_owned(), None),
454 };
455
456 let project = match project {
457 Some(value) => value,
458 None => {
459 let decoded = path
460 .parent()
461 .and_then(|p| p.file_name())
462 .and_then(|n| n.to_str())
463 .map(|s| s.replace('-', "/"))
464 .ok_or_else(|| {
465 AdapterError::schema(
466 NAME,
467 path_display.clone(),
468 "no `cwd` field in any row and source path is not UTF-8",
469 )
470 })?;
471 extract_self_str(&Value::String(decoded)).ok_or_else(|| {
472 AdapterError::schema(
473 NAME,
474 path_display.clone(),
475 "internal: Value::String produced None from Source::as_str",
476 )
477 })?
478 }
479 };
480
481 let mut options = ProviderOptions::new();
482 options.insert(
483 "source".to_owned(),
484 json!({
485 "adapter": "claude-code",
486 "version": version,
487 "project_dir": project_dir,
488 "workspace_path": &*project,
489 }),
490 );
491 if let Some(metadata) = subagent_options {
492 options.insert("subagent".to_owned(), metadata);
493 }
494
495 Ok(Session {
496 id: session_id,
497 parent_session_id,
498 parent_message_id: None,
499 source_agent,
500 created_at,
501 project,
502 options,
503 })
504}
505
506fn source_project_dir(path: &Path, is_subagent: bool) -> Option<String> {
507 let project_dir = if is_subagent {
508 path.parent().and_then(Path::parent).and_then(Path::parent)
509 } else {
510 path.parent()
511 };
512 project_dir
513 .and_then(|p| p.file_name())
514 .and_then(|n| n.to_str())
515 .map(ToOwned::to_owned)
516}
517
518struct SubagentDescriptor {
524 parent_uuid: String,
525 agent_hash: String,
526 agent_type: Option<String>,
527 meta: Option<Value>,
528}
529
530fn subagent_ids(path: &Path) -> Option<(String, String)> {
534 let file_name = path.file_name()?.to_str()?;
535 let agent_hash = file_name
536 .strip_prefix("agent-")?
537 .strip_suffix(".jsonl")?
538 .to_owned();
539 let subagents_dir = path.parent()?;
540 if subagents_dir.file_name()?.to_str()? != "subagents" {
541 return None;
542 }
543 let parent_uuid = subagents_dir.parent()?.file_name()?.to_str()?.to_owned();
544 Some((parent_uuid, agent_hash))
545}
546
547fn subagent_descriptor(path: &Path) -> Option<SubagentDescriptor> {
550 let (parent_uuid, agent_hash) = subagent_ids(path)?;
551 let meta_path = path.parent()?.join(format!("agent-{agent_hash}.meta.json"));
552 let (agent_type, meta) = match std::fs::read(&meta_path) {
553 Ok(bytes) => match serde_json::from_slice::<Value>(&bytes) {
554 Ok(value) => (
555 value
556 .get("agentType")
557 .and_then(Value::as_str)
558 .map(ToOwned::to_owned),
559 Some(value),
560 ),
561 Err(error) => {
562 tracing::debug!(
563 target: "pond::adapter::claude_code",
564 meta = %meta_path.display(),
565 %error,
566 "subagent .meta.json present but unparseable; falling back to 'claude-code/subagent'",
567 );
568 (None, None)
569 }
570 },
571 Err(error) if error.kind() == std::io::ErrorKind::NotFound => (None, None),
572 Err(error) => {
573 tracing::debug!(
574 target: "pond::adapter::claude_code",
575 meta = %meta_path.display(),
576 %error,
577 "subagent .meta.json IO error; falling back to 'claude-code/subagent'",
578 );
579 (None, None)
580 }
581 };
582
583 Some(SubagentDescriptor {
584 parent_uuid,
585 agent_hash,
586 agent_type,
587 meta,
588 })
589}
590
591fn events_from_row(
592 session_id: &str,
593 line: usize,
594 row: &Value,
595 default_timestamp: DateTime<Utc>,
596 state: &FileState,
597) -> Result<Vec<IngestEvent>, String> {
598 let timestamp = parse_timestamp(row).unwrap_or(default_timestamp);
599 let uuid = row
600 .get("uuid")
601 .and_then(Value::as_str)
602 .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
603
604 if let Some(message_value) = row.get("message") {
605 return message_events(
606 session_id,
607 &uuid,
608 timestamp,
609 row,
610 message_value,
611 state,
612 line,
613 );
614 }
615
616 let raw_type = row.get("type").and_then(Value::as_str);
623 let content = if raw_type == Some("attachment") {
624 row.get("attachment")
625 .and_then(attachment_content)
626 .or_else(|| Some(extract_compact_repr(row)))
627 } else {
628 extract_str(row, "subtype").or_else(|| extract_str(row, "type"))
629 };
630 let message = Message::System {
631 id: uuid,
632 session_id: session_id.to_owned(),
633 timestamp,
634 content,
635 options: row_options(row, line),
636 };
637 Ok(vec![IngestEvent::Message(message)])
638}
639
640fn message_events(
641 session_id: &str,
642 uuid: &str,
643 timestamp: DateTime<Utc>,
644 row: &Value,
645 message_value: &Value,
646 state: &FileState,
647 line: usize,
648) -> Result<Vec<IngestEvent>, String> {
649 let role = message_value
650 .get("role")
651 .and_then(Value::as_str)
652 .ok_or_else(|| "message missing role".to_owned())?;
653 let content = message_value.get("content").unwrap_or(&Value::Null);
654 let mut parts = Vec::new();
655 let message = match (role, content) {
656 ("user", Value::String(text)) => {
657 let provenance = user_text_provenance(row, text);
661 parts.push(text_part(
662 session_id,
663 uuid,
664 0,
665 extract_self_str(content),
666 provenance,
667 ));
668 Message::User {
669 id: uuid.to_owned(),
670 session_id: session_id.to_owned(),
671 timestamp,
672 options: row_options(row, line),
673 }
674 }
675 ("user", Value::Array(items)) if items.iter().all(is_tool_result) => {
676 let source_tool_result = row.get("toolUseResult").cloned();
677 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
678 tool_result_part(
679 session_id,
680 uuid,
681 ordinal,
682 item,
683 source_tool_result.as_ref(),
684 state,
685 )
686 }));
687 Message::Tool {
688 id: uuid.to_owned(),
689 session_id: session_id.to_owned(),
690 timestamp,
691 options: row_options(row, line),
692 }
693 }
694 ("user", Value::Array(items)) => {
695 let provenance = user_array_provenance(row, items);
698 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
699 user_part(session_id, uuid, ordinal, item, state, provenance)
700 }));
701 Message::User {
702 id: uuid.to_owned(),
703 session_id: session_id.to_owned(),
704 timestamp,
705 options: row_options(row, line),
706 }
707 }
708 ("assistant", Value::Array(items)) => {
709 parts.extend(
710 items
711 .iter()
712 .enumerate()
713 .map(|(ordinal, item)| assistant_part(session_id, uuid, ordinal, item)),
714 );
715 Message::Assistant {
716 id: uuid.to_owned(),
717 session_id: session_id.to_owned(),
718 timestamp,
719 options: assistant_options(row, message_value, line),
720 }
721 }
722 ("system", Value::String(_)) => Message::System {
723 id: uuid.to_owned(),
724 session_id: session_id.to_owned(),
725 timestamp,
726 content: extract_self_str(content),
727 options: row_options(row, line),
728 },
729 ("system", _) => Message::System {
730 id: uuid.to_owned(),
731 session_id: session_id.to_owned(),
732 timestamp,
733 content: Some(extract_compact_repr(message_value)),
738 options: row_options(row, line),
739 },
740 (other, _) => {
741 return Err(format!("unsupported message role {other}"));
742 }
743 };
744
745 let mut events = Vec::with_capacity(parts.len() + 1);
746 events.push(IngestEvent::Message(message));
747 events.extend(parts.into_iter().map(IngestEvent::Part));
748 Ok(events)
749}
750
751fn text_part(
752 session_id: &str,
753 message_id: &str,
754 ordinal: usize,
755 text: Option<Extracted<String>>,
756 provenance: Provenance,
757) -> Part {
758 Part {
759 session_id: session_id.to_owned(),
760 id: part_id(message_id, ordinal),
761 message_id: message_id.to_owned(),
762 ordinal: i32::try_from(ordinal).unwrap_or(i32::MAX),
763 provenance,
764 options: empty_options(),
765 kind: PartKind::Text { text },
766 }
767}
768
769fn user_part(
770 session_id: &str,
771 message_id: &str,
772 ordinal: usize,
773 value: &Value,
774 state: &FileState,
775 provenance: Provenance,
776) -> Part {
777 match value.get("type").and_then(Value::as_str) {
778 Some("text") => text_part(
779 session_id,
780 message_id,
781 ordinal,
782 extract_str(value, "text"),
783 provenance,
784 ),
785 Some("image") | Some("file") => {
786 file_part(session_id, message_id, ordinal, value, provenance)
787 }
788 Some("tool_result") => {
789 tool_result_part(session_id, message_id, ordinal, value, None, state)
790 }
791 _ => text_part(
795 session_id,
796 message_id,
797 ordinal,
798 Some(extract_compact_repr(value)),
799 provenance,
800 ),
801 }
802}
803
804fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, value: &Value) -> Part {
805 match value.get("type").and_then(Value::as_str) {
809 Some("text") => text_part(
810 session_id,
811 message_id,
812 ordinal,
813 extract_str(value, "text"),
814 Provenance::Conversational,
815 ),
816 Some("thinking") => Part {
817 session_id: session_id.to_owned(),
818 id: part_id(message_id, ordinal),
819 message_id: message_id.to_owned(),
820 ordinal: i32::try_from(ordinal).unwrap_or(i32::MAX),
821 provenance: Provenance::Conversational,
822 options: signature_options(value),
823 kind: PartKind::Reasoning {
824 text: extract_str(value, "thinking"),
825 },
826 },
827 Some("tool_use") => Part {
828 session_id: session_id.to_owned(),
829 id: part_id(message_id, ordinal),
830 message_id: message_id.to_owned(),
831 ordinal: i32::try_from(ordinal).unwrap_or(i32::MAX),
832 provenance: Provenance::Conversational,
833 options: empty_options(),
834 kind: PartKind::ToolCall {
835 call_id: extract_str(value, "id"),
836 name: extract_str(value, "name"),
837 params: value.get("input").cloned().unwrap_or(Value::Null),
838 provider_executed: false,
839 },
840 },
841 Some("server_tool_use") => Part {
842 session_id: session_id.to_owned(),
843 id: part_id(message_id, ordinal),
844 message_id: message_id.to_owned(),
845 ordinal: i32::try_from(ordinal).unwrap_or(i32::MAX),
846 provenance: Provenance::Conversational,
847 options: empty_options(),
848 kind: PartKind::ToolCall {
849 call_id: extract_str(value, "id"),
850 name: extract_str(value, "name"),
851 params: value.get("input").cloned().unwrap_or(Value::Null),
852 provider_executed: true,
853 },
854 },
855 Some("image") | Some("file") => file_part(
856 session_id,
857 message_id,
858 ordinal,
859 value,
860 Provenance::Conversational,
861 ),
862 _ => text_part(
865 session_id,
866 message_id,
867 ordinal,
868 Some(extract_compact_repr(value)),
869 Provenance::Conversational,
870 ),
871 }
872}
873
874fn tool_result_part(
875 session_id: &str,
876 message_id: &str,
877 ordinal: usize,
878 value: &Value,
879 source_tool_result: Option<&Value>,
880 state: &FileState,
881) -> Part {
882 let call_id = extract_str(value, "tool_use_id");
883 let name = value
889 .str_field("tool_use_id")
890 .and_then(|id| state.tool_call_names.get(id))
891 .cloned();
892 let result = value
893 .get("content")
894 .cloned()
895 .or_else(|| source_tool_result.cloned())
896 .unwrap_or(Value::Null);
897 Part {
898 session_id: session_id.to_owned(),
899 id: part_id(message_id, ordinal),
900 message_id: message_id.to_owned(),
901 ordinal: i32::try_from(ordinal).unwrap_or(i32::MAX),
902 provenance: Provenance::Injected,
905 options: empty_options(),
906 kind: PartKind::ToolResult {
907 call_id,
908 name,
909 is_failure: value
910 .get("is_error")
911 .and_then(Value::as_bool)
912 .unwrap_or(false),
913 result,
914 },
915 }
916}
917
918fn file_part(
919 session_id: &str,
920 message_id: &str,
921 ordinal: usize,
922 value: &Value,
923 provenance: Provenance,
924) -> Part {
925 let media_type = value
926 .get("media_type")
927 .or_else(|| value.get("mime_type"))
928 .and_then(Value::as_str)
929 .unwrap_or("application/octet-stream")
930 .to_owned();
931 let file_name = value
932 .get("file_name")
933 .or_else(|| value.get("name"))
934 .and_then(Value::as_str)
935 .map(ToOwned::to_owned);
936 let data = if let Some(source) = value.get("source") {
937 if let Some(url) = source.get("url").and_then(Value::as_str) {
938 FileData::Url(url.to_owned())
939 } else if let Some(bytes) = source.get("data").and_then(Value::as_str) {
940 FileData::String(bytes.to_owned())
941 } else {
942 FileData::String(compact_json(source))
943 }
944 } else if let Some(url) = value.get("url").and_then(Value::as_str) {
945 FileData::Url(url.to_owned())
946 } else {
947 FileData::String(compact_json(value))
948 };
949
950 Part {
951 session_id: session_id.to_owned(),
952 id: part_id(message_id, ordinal),
953 message_id: message_id.to_owned(),
954 ordinal: i32::try_from(ordinal).unwrap_or(i32::MAX),
955 provenance,
956 options: empty_options(),
957 kind: PartKind::File {
958 media_type,
959 file_name,
960 data,
961 },
962 }
963}
964
965fn row_options(row: &Value, line: usize) -> ProviderOptions {
966 let mut options = ProviderOptions::new();
967 let source = json!({
968 "line": line,
969 "parent_uuid": row.get("parentUuid"),
970 "is_sidechain": row.get("isSidechain"),
971 "user_type": row.get("userType"),
972 "entrypoint": row.get("entrypoint"),
973 "cwd": row.get("cwd"),
974 "version": row.get("version"),
975 "git_branch": row.get("gitBranch"),
976 "request_id": row.get("requestId"),
977 "raw_type": row.get("type"),
978 "raw_record": extract_raw_record(row),
979 });
980 options.insert("source".to_owned(), source);
981 options
982}
983
984fn assistant_options(row: &Value, message_value: &Value, line: usize) -> ProviderOptions {
985 let mut options = row_options(row, line);
986 let anthropic = json!({
987 "id": message_value.get("id"),
988 "model": message_value.get("model"),
989 "stop_reason": message_value.get("stop_reason"),
990 "stop_sequence": message_value.get("stop_sequence"),
991 "usage": message_value.get("usage"),
992 });
993 options.insert("anthropic".to_owned(), anthropic);
994 options
995}
996
997fn signature_options(value: &Value) -> ProviderOptions {
998 let mut options = ProviderOptions::new();
999 if let Some(signature) = value.get("signature").and_then(Value::as_str) {
1000 options.insert("anthropic".to_owned(), json!({"signature": signature}));
1001 }
1002 options
1003}
1004
1005fn attachment_content(value: &Value) -> Option<Extracted<String>> {
1006 extract_str(value, "content").or_else(|| extract_str(value, "stdout"))
1007}
1008
1009fn parse_timestamp(value: &Value) -> anyhow::Result<DateTime<Utc>> {
1010 let timestamp = value
1011 .get("timestamp")
1012 .and_then(Value::as_str)
1013 .context("missing timestamp")?;
1014 Ok(DateTime::parse_from_rfc3339(timestamp)
1015 .context("invalid timestamp")?
1016 .with_timezone(&Utc))
1017}
1018
1019fn is_tool_result(value: &Value) -> bool {
1020 value.get("type").and_then(Value::as_str) == Some("tool_result")
1021}
1022
1023fn is_meta_row(row: &Value) -> bool {
1026 row.get("isMeta").and_then(Value::as_bool) == Some(true)
1027}
1028
1029fn is_injected_user_text(text: &str) -> bool {
1033 let trimmed = text.trim_start();
1034 trimmed.starts_with("<task-notification>")
1035 || trimmed.starts_with("<command-name>")
1036 || trimmed.starts_with("<command-message>")
1037 || trimmed.starts_with("<command-args>")
1038 || trimmed.starts_with("<local-command-caveat>")
1039 || trimmed.starts_with("<local-command-stdout>")
1040 || trimmed.starts_with("[Request interrupted by user")
1041}
1042
1043fn user_text_provenance(row: &Value, text: &str) -> Provenance {
1046 if is_meta_row(row) || is_injected_user_text(text) {
1047 Provenance::Injected
1048 } else {
1049 Provenance::Conversational
1050 }
1051}
1052
1053fn user_array_provenance(row: &Value, items: &[Value]) -> Provenance {
1057 if is_meta_row(row) {
1058 return Provenance::Injected;
1059 }
1060 let wrapped = items.iter().any(|item| {
1061 item.get("type").and_then(Value::as_str) == Some("text")
1062 && item
1063 .get("text")
1064 .and_then(Value::as_str)
1065 .is_some_and(is_injected_user_text)
1066 });
1067 if wrapped {
1068 Provenance::Injected
1069 } else {
1070 Provenance::Conversational
1071 }
1072}
1073
1074#[cfg(test)]
1075mod tests {
1076 #![allow(clippy::expect_used, clippy::unwrap_used)]
1084
1085 use super::*;
1086 use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
1087 use tempfile::TempDir;
1088
1089 const FIXTURE_ROOT: &str = "tests/fixtures/adapter/claude_code/projects";
1090
1091 #[tokio::test(flavor = "multi_thread")]
1092 async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
1093 let adapter = ClaudeCodeAdapter::new(FIXTURE_ROOT);
1094 crate::adapter::test_support::assert_native_restore(
1095 &ClaudeCodeFactory,
1096 &adapter,
1097 std::path::Path::new(FIXTURE_ROOT),
1098 )
1099 .await
1100 }
1101
1102 #[tokio::test(flavor = "multi_thread")]
1110 async fn subagent_file_derives_child_session_with_parent_link() -> anyhow::Result<()> {
1111 let corpus = TempDir::new()?;
1112 let project_dir = corpus.path().join("-tmp-pond-test");
1113 let parent_uuid = "11111111-1111-1111-1111-111111111111";
1114 let agent_hash = "abc123def456";
1115 std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1116
1117 let parent_row = serde_json::json!({
1119 "type": "user",
1120 "uuid": "u-parent-1",
1121 "sessionId": parent_uuid,
1122 "cwd": "/tmp/pond-test",
1123 "timestamp": "2026-05-16T00:00:00.000Z",
1124 "version": "2.1.121",
1125 "message": {"role": "user", "content": "hi parent"},
1126 });
1127 std::fs::write(
1128 project_dir.join(format!("{parent_uuid}.jsonl")),
1129 format!("{parent_row}\n"),
1130 )?;
1131
1132 let subagent_row = serde_json::json!({
1135 "type": "user",
1136 "uuid": "u-sub-1",
1137 "sessionId": parent_uuid,
1138 "cwd": "/tmp/pond-test",
1139 "isSidechain": true,
1140 "agentId": agent_hash,
1141 "timestamp": "2026-05-16T00:01:00.000Z",
1142 "version": "2.1.121",
1143 "message": {"role": "user", "content": "subagent prompt"},
1144 });
1145 std::fs::write(
1146 project_dir
1147 .join(parent_uuid)
1148 .join("subagents")
1149 .join(format!("agent-{agent_hash}.jsonl")),
1150 format!("{subagent_row}\n"),
1151 )?;
1152 std::fs::write(
1153 project_dir
1154 .join(parent_uuid)
1155 .join("subagents")
1156 .join(format!("agent-{agent_hash}.meta.json")),
1157 r#"{"agentType":"general-purpose","description":"do a thing"}"#,
1158 )?;
1159
1160 let store_dir = TempDir::new()?;
1161 let store = Store::open_local(store_dir.path()).await?;
1162 let adapter = ClaudeCodeAdapter::new(corpus.path());
1163
1164 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1165 assert_eq!(
1166 summary.dropped_sessions, 0,
1167 "subagent file must NOT collide with parent (pre-fix this was the project-immutable rejection)"
1168 );
1169
1170 let parent = store
1171 .get_session(parent_uuid)
1172 .await?
1173 .expect("parent session should ingest as the bare uuid");
1174 assert_eq!(parent.session.source_agent, "claude-code");
1175 assert_eq!(parent.session.parent_session_id, None);
1176
1177 let child_id = format!("{parent_uuid}/agent-{agent_hash}");
1178 let child = store
1179 .get_session(&child_id)
1180 .await?
1181 .expect("subagent session must surface under the derived id");
1182 assert_eq!(
1183 child.session.source_agent, "claude-code/general-purpose",
1184 "agent_type from .meta.json should suffix the source_agent label"
1185 );
1186 assert_eq!(
1187 child.session.parent_session_id.as_deref(),
1188 Some(parent_uuid),
1189 "subagent must link back to parent via parent_session_id",
1190 );
1191 let subagent_meta = child
1192 .session
1193 .options
1194 .get("subagent")
1195 .expect("options.subagent must carry the hash + verbatim meta.json");
1196 assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1197 assert_eq!(
1198 subagent_meta["meta"]["agentType"],
1199 serde_json::json!("general-purpose")
1200 );
1201 assert_eq!(
1202 subagent_meta["meta"]["description"],
1203 serde_json::json!("do a thing")
1204 );
1205 Ok(())
1206 }
1207
1208 #[tokio::test(flavor = "multi_thread")]
1212 async fn subagent_without_meta_falls_back_to_generic_label() -> anyhow::Result<()> {
1213 let corpus = TempDir::new()?;
1214 let project_dir = corpus.path().join("-tmp-pond-test");
1215 let parent_uuid = "22222222-2222-2222-2222-222222222222";
1216 let agent_hash = "deadbeef";
1217 std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1218 let row = serde_json::json!({
1219 "type": "user",
1220 "uuid": "u-sub-only",
1221 "sessionId": parent_uuid,
1222 "cwd": "/tmp/pond-test",
1223 "timestamp": "2026-05-16T00:00:00.000Z",
1224 "message": {"role": "user", "content": "no meta sibling here"},
1225 });
1226 std::fs::write(
1227 project_dir
1228 .join(parent_uuid)
1229 .join("subagents")
1230 .join(format!("agent-{agent_hash}.jsonl")),
1231 format!("{row}\n"),
1232 )?;
1233
1234 let store_dir = TempDir::new()?;
1235 let store = Store::open_local(store_dir.path()).await?;
1236 let adapter = ClaudeCodeAdapter::new(corpus.path());
1237 let _summary =
1238 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1239
1240 let child = store
1241 .get_session(&format!("{parent_uuid}/agent-{agent_hash}"))
1242 .await?
1243 .expect("derived child id even without meta");
1244 assert_eq!(child.session.source_agent, "claude-code/subagent");
1245 Ok(())
1246 }
1247
1248 #[tokio::test(flavor = "multi_thread")]
1253 async fn replay_duplicates_are_dedup_at_adapter_layer() -> anyhow::Result<()> {
1254 let corpus = TempDir::new()?;
1255 let project_dir = corpus.path().join("-tmp-pond-test");
1256 std::fs::create_dir_all(&project_dir)?;
1257 let session_uuid = "33333333-3333-3333-3333-333333333333";
1258 let dup_uuid = "u-shared-1";
1259 let row = serde_json::json!({
1260 "type": "user",
1261 "uuid": dup_uuid,
1262 "sessionId": session_uuid,
1263 "cwd": "/tmp/pond-test",
1264 "timestamp": "2026-05-16T00:00:00.000Z",
1265 "message": {"role": "user", "content": "replayed three times"},
1266 });
1267 let body = format!("{row}\n{row}\n{row}\n");
1269 std::fs::write(project_dir.join(format!("{session_uuid}.jsonl")), body)?;
1270
1271 let store_dir = TempDir::new()?;
1272 let store = Store::open_local(store_dir.path()).await?;
1273 let adapter = ClaudeCodeAdapter::new(corpus.path());
1274 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1275
1276 assert_eq!(
1277 summary.dropped_events, 0,
1278 "adapter must dedupe replays before they reach the validator"
1279 );
1280 assert!(
1281 !summary
1282 .drop_reasons
1283 .contains_key(crate::sessions::DROP_REASON_DUPLICATE_MESSAGE_ID),
1284 "duplicate_message_id bucket stays empty when adapter does its job"
1285 );
1286 Ok(())
1287 }
1288
1289 #[tokio::test(flavor = "multi_thread")]
1293 async fn tool_result_name_resolves_from_prior_tool_use_in_same_file() -> anyhow::Result<()> {
1294 let corpus = TempDir::new()?;
1295 let project_dir = corpus.path().join("-tmp-pond-test");
1296 std::fs::create_dir_all(&project_dir)?;
1297 let session_uuid = "44444444-4444-4444-4444-444444444444";
1298 let call_id = "toolu_test_01";
1299
1300 let tool_use_row = serde_json::json!({
1301 "type": "assistant",
1302 "uuid": "u-call",
1303 "sessionId": session_uuid,
1304 "cwd": "/tmp/pond-test",
1305 "timestamp": "2026-05-16T00:00:00.000Z",
1306 "message": {
1307 "role": "assistant",
1308 "content": [{
1309 "type": "tool_use",
1310 "id": call_id,
1311 "name": "Edit",
1312 "input": {"file_path": "/tmp/foo"},
1313 }],
1314 },
1315 });
1316 let tool_result_row = serde_json::json!({
1317 "type": "user",
1318 "uuid": "u-result",
1319 "sessionId": session_uuid,
1320 "cwd": "/tmp/pond-test",
1321 "timestamp": "2026-05-16T00:00:01.000Z",
1322 "message": {
1323 "role": "user",
1324 "content": [{
1325 "type": "tool_result",
1326 "tool_use_id": call_id,
1327 "content": "ok",
1328 }],
1329 },
1330 });
1331 std::fs::write(
1332 project_dir.join(format!("{session_uuid}.jsonl")),
1333 format!("{tool_use_row}\n{tool_result_row}\n"),
1334 )?;
1335
1336 let store_dir = TempDir::new()?;
1337 let store = Store::open_local(store_dir.path()).await?;
1338 let adapter = ClaudeCodeAdapter::new(corpus.path());
1339 let _summary =
1340 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1341 let session = store
1342 .get_session(session_uuid)
1343 .await?
1344 .expect("session ingests");
1345
1346 let mut saw_call = false;
1347 let mut saw_result = false;
1348 for stored in &session.messages {
1349 for part in &stored.parts {
1350 match &part.kind {
1351 PartKind::ToolCall {
1352 call_id: cid, name, ..
1353 } => {
1354 assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1355 assert_eq!(
1356 name.as_ref().map(|e| e.as_str()),
1357 Some("Edit"),
1358 "tool_use carries the name directly"
1359 );
1360 saw_call = true;
1361 }
1362 PartKind::ToolResult {
1363 call_id: cid, name, ..
1364 } => {
1365 assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1366 assert_eq!(
1367 name.as_ref().map(|e| e.as_str()),
1368 Some("Edit"),
1369 "tool_result resolves the name via the per-file map (was 'unknown' pre-2026-05-16)"
1370 );
1371 saw_result = true;
1372 }
1373 _ => {}
1374 }
1375 }
1376 }
1377 assert!(saw_call && saw_result, "both parts must be present");
1378 Ok(())
1379 }
1380
1381 #[test]
1385 fn user_text_provenance_separates_prompts_from_harness_injection() {
1386 let prompt = json!({"type": "user", "uuid": "u1"});
1387 assert_eq!(
1388 user_text_provenance(&prompt, "please refactor the parser"),
1389 Provenance::Conversational,
1390 );
1391
1392 let notification = json!({"type": "user", "uuid": "u2"});
1393 assert_eq!(
1394 user_text_provenance(
1395 ¬ification,
1396 "<task-notification>background task done</task-notification>",
1397 ),
1398 Provenance::Injected,
1399 );
1400
1401 let meta = json!({"type": "user", "uuid": "u3", "isMeta": true});
1402 assert_eq!(
1403 user_text_provenance(&meta, "expanded skill body"),
1404 Provenance::Injected,
1405 );
1406 }
1407
1408 #[tokio::test(flavor = "multi_thread")]
1412 async fn task_notification_message_yields_injected_parts() -> anyhow::Result<()> {
1413 let corpus = TempDir::new()?;
1414 let project_dir = corpus.path().join("-tmp-pond-test");
1415 std::fs::create_dir_all(&project_dir)?;
1416 let session_uuid = "66666666-6666-6666-6666-666666666666";
1417 let prompt = serde_json::json!({
1418 "type": "user",
1419 "uuid": "u-prompt",
1420 "sessionId": session_uuid,
1421 "cwd": "/tmp/pond-test",
1422 "timestamp": "2026-05-16T00:00:00.000Z",
1423 "message": {"role": "user", "content": "genuine human prompt"},
1424 });
1425 let notification = serde_json::json!({
1426 "type": "user",
1427 "uuid": "u-notify",
1428 "sessionId": session_uuid,
1429 "cwd": "/tmp/pond-test",
1430 "timestamp": "2026-05-16T00:00:01.000Z",
1431 "message": {
1432 "role": "user",
1433 "content": "<task-notification>a background task finished</task-notification>",
1434 },
1435 });
1436 std::fs::write(
1437 project_dir.join(format!("{session_uuid}.jsonl")),
1438 format!("{prompt}\n{notification}\n"),
1439 )?;
1440
1441 let store_dir = TempDir::new()?;
1442 let store = Store::open_local(store_dir.path()).await?;
1443 let adapter = ClaudeCodeAdapter::new(corpus.path());
1444 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1445
1446 let session = store
1447 .get_session(session_uuid)
1448 .await?
1449 .expect("session ingests");
1450 let mut saw_prompt = false;
1451 let mut saw_notification = false;
1452 for stored in &session.messages {
1453 for part in &stored.parts {
1454 if stored.message.id() == "u-prompt" {
1455 assert_eq!(part.provenance, crate::wire::Provenance::Conversational);
1456 saw_prompt = true;
1457 }
1458 if stored.message.id() == "u-notify" {
1459 assert_eq!(part.provenance, crate::wire::Provenance::Injected);
1460 saw_notification = true;
1461 }
1462 }
1463 }
1464 assert!(saw_prompt && saw_notification, "both messages present");
1465 Ok(())
1466 }
1467
1468 #[tokio::test(flavor = "multi_thread")]
1472 async fn orphan_tool_result_yields_name_none_not_unknown_sentinel() -> anyhow::Result<()> {
1473 let corpus = TempDir::new()?;
1474 let project_dir = corpus.path().join("-tmp-pond-test");
1475 std::fs::create_dir_all(&project_dir)?;
1476 let session_uuid = "55555555-5555-5555-5555-555555555555";
1477
1478 let row = serde_json::json!({
1480 "type": "user",
1481 "uuid": "u-orphan",
1482 "sessionId": session_uuid,
1483 "cwd": "/tmp/pond-test",
1484 "timestamp": "2026-05-16T00:00:00.000Z",
1485 "message": {
1486 "role": "user",
1487 "content": [{
1488 "type": "tool_result",
1489 "tool_use_id": "toolu_orphan",
1490 "content": "result body, no matching call",
1491 }],
1492 },
1493 });
1494 std::fs::write(
1495 project_dir.join(format!("{session_uuid}.jsonl")),
1496 format!("{row}\n"),
1497 )?;
1498
1499 let store_dir = TempDir::new()?;
1500 let store = Store::open_local(store_dir.path()).await?;
1501 let adapter = ClaudeCodeAdapter::new(corpus.path());
1502 let _summary =
1503 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1504 let session = store
1505 .get_session(session_uuid)
1506 .await?
1507 .expect("session ingests");
1508 let mut found = false;
1509 for stored in &session.messages {
1510 for part in &stored.parts {
1511 if let PartKind::ToolResult { name, call_id, .. } = &part.kind {
1512 assert_eq!(call_id.as_ref().map(|e| e.as_str()), Some("toolu_orphan"));
1513 assert!(
1514 name.is_none(),
1515 "orphan tool_result must be name=None, not synthesized 'unknown'",
1516 );
1517 found = true;
1518 }
1519 }
1520 }
1521 assert!(found, "orphan tool_result part must be present");
1522 Ok(())
1524 }
1525}