1use std::{
10 collections::{HashMap, HashSet},
11 path::{Path, PathBuf},
12};
13
14use anyhow::Context as _;
15use chrono::{DateTime, SecondsFormat, Utc};
16use serde_json::{Value, json};
17
18use crate::{
19 sessions::IngestEvent,
20 wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
21};
22
23use super::{
24 Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
25 RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
26 empty_options,
27 extract::{
28 Extracted, Source, extract_compact_repr, extract_raw_record, extract_self_str, extract_str,
29 },
30 extracted_text,
31 jsonl::{BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events, source_line},
32 jsonl_bytes, part_id, part_ordinal, raw_record,
33};
34
35#[derive(Debug, Default)]
56pub(crate) struct FileState {
57 seen_uuids: HashSet<String>,
58 tool_call_names: HashMap<String, Extracted<String>>,
59}
60
61const NAME: &str = "claude-code";
65
66pub struct ClaudeCodeFactory;
69
70impl AdapterFactory for ClaudeCodeFactory {
71 fn name(&self) -> &'static str {
72 NAME
73 }
74
75 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
76 Ok(Box::new(ClaudeCodeAdapter::new(config_path(NAME, config)?)))
77 }
78
79 fn probe_default(&self, env: &Env) -> Option<Value> {
80 let path = env.home.join(".claude").join("projects");
81 path.exists().then(|| json!({ "path": path }))
82 }
83
84 fn serialize(
85 &self,
86 session: &crate::sessions::SessionWithMessages,
87 fidelity: RestoreFidelity,
88 ) -> Result<Vec<RestoredFile>, AdapterError> {
89 serialize_session(session, fidelity)
90 }
91}
92
93fn serialize_session(
94 session: &crate::sessions::SessionWithMessages,
95 fidelity: RestoreFidelity,
96) -> Result<Vec<RestoredFile>, AdapterError> {
97 let mut messages = session.messages.clone();
98 if fidelity == RestoreFidelity::Native {
99 messages.sort_by(|left, right| {
100 source_line(left.message.options())
101 .cmp(&source_line(right.message.options()))
102 .then_with(|| by_timestamp_then_id(left, right))
103 });
104 } else {
105 messages.sort_by(by_timestamp_then_id);
106 }
107 let mut records = Vec::with_capacity(messages.len());
111 let mut parent_uuid = None::<String>;
112 for message in &messages {
113 if fidelity == RestoreFidelity::Native
114 && let Some(raw) = raw_record(message.message.options())
115 {
116 parent_uuid = raw
117 .get("uuid")
118 .and_then(Value::as_str)
119 .map(ToOwned::to_owned)
120 .or(parent_uuid);
121 records.push(raw);
122 continue;
123 }
124 let Some(record) = claude_record(session, message, parent_uuid.as_deref()) else {
127 continue;
128 };
129 parent_uuid = record
130 .get("uuid")
131 .and_then(Value::as_str)
132 .map(ToOwned::to_owned);
133 records.push(record);
134 }
135
136 let mut files = vec![RestoredFile::new(
137 claude_relative_path(session),
138 jsonl_bytes(NAME, &records)?,
139 fidelity,
140 )];
141 if session.session.parent_session_id.is_some()
142 && let Some(meta) = subagent_meta_record(session)
143 {
144 let mut meta_path = files[0].relative_path.clone();
145 meta_path.set_extension("meta.json");
146 files.push(RestoredFile::new(
147 meta_path,
148 serde_json::to_vec(&meta).map_err(|err| {
149 AdapterError::schema(
150 NAME,
151 &session.session.id,
152 format!("json encode failed: {err}"),
153 )
154 })?,
155 fidelity,
156 ));
157 }
158 Ok(files)
159}
160
161fn claude_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
162 let encoded_project = session
163 .session
164 .options
165 .get("source")
166 .and_then(|source| source.get("project_dir"))
167 .and_then(Value::as_str)
168 .map(ToOwned::to_owned)
169 .unwrap_or_else(|| encode_project(&session.session.project));
170 if let Some(parent) = &session.session.parent_session_id {
171 let agent = session
172 .session
173 .id
174 .rsplit('/')
175 .next()
176 .unwrap_or(&session.session.id);
177 return PathBuf::from(encoded_project)
178 .join(parent)
179 .join("subagents")
180 .join(format!("{agent}.jsonl"));
181 }
182 PathBuf::from(encoded_project).join(format!("{}.jsonl", session.session.id))
183}
184
185fn encode_project(project: &str) -> String {
186 project.replace(['/', '.'], "-")
187}
188
189fn subagent_meta_record(session: &crate::sessions::SessionWithMessages) -> Option<Value> {
190 let meta = session.session.options.get("subagent")?.get("meta")?;
194 meta.is_object().then(|| meta.clone())
195}
196
197fn claude_record(
198 session: &crate::sessions::SessionWithMessages,
199 message: &crate::sessions::MessageWithParts,
200 parent_uuid: Option<&str>,
201) -> Option<Value> {
202 let row_role = match &message.message {
210 Message::System { .. } => return None,
211 Message::User { .. } | Message::Tool { .. } => "user",
212 Message::Assistant { .. } => "assistant",
213 };
214 let mut envelope = serde_json::Map::new();
215 envelope.insert("role".to_owned(), Value::String(row_role.to_owned()));
216 if row_role == "assistant" {
217 envelope.insert("type".to_owned(), Value::String("message".to_owned()));
220 }
221 envelope.insert(
222 "content".to_owned(),
223 Value::Array(message.parts.iter().map(claude_part).collect()),
224 );
225 Some(json!({
226 "parentUuid": parent_uuid,
227 "isSidechain": false,
228 "userType": "external",
229 "cwd": &*session.session.project,
230 "sessionId": &session.session.id,
231 "type": row_role,
232 "message": Value::Object(envelope),
233 "uuid": message.message.id(),
234 "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
235 }))
236}
237
238fn claude_part(part: &Part) -> Value {
239 match &part.kind {
240 PartKind::Text { text } => json!({"type": "text", "text": extracted_text(text)}),
241 PartKind::Reasoning { text } => {
242 json!({"type": "thinking", "thinking": extracted_text(text)})
243 }
244 PartKind::ToolCall {
245 call_id,
246 name,
247 params,
248 provider_executed,
249 } => json!({
250 "type": if *provider_executed { "server_tool_use" } else { "tool_use" },
251 "id": extracted_text(call_id),
252 "name": extracted_text(name),
253 "input": params,
254 }),
255 PartKind::ToolResult {
256 call_id,
257 is_failure,
258 result,
259 ..
260 } => json!({
261 "type": "tool_result",
262 "tool_use_id": extracted_text(call_id),
263 "is_error": is_failure,
264 "content": result,
265 }),
266 PartKind::File {
267 media_type,
268 file_name,
269 data,
270 } => json!({
271 "type": "file",
272 "media_type": media_type,
273 "file_name": file_name,
274 "source": file_source(data),
275 }),
276 other => {
277 json!({"type": "text", "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null))})
278 }
279 }
280}
281
282fn file_source(data: &FileData) -> Value {
283 match data {
284 FileData::String(value) => json!({"type": "text", "data": value}),
285 FileData::Bytes(value) => json!({"type": "base64", "data": value}),
286 FileData::Url(value) => json!({"type": "url", "url": value}),
287 }
288}
289
290#[derive(Debug, Clone)]
293pub struct ClaudeCodeAdapter {
294 root: PathBuf,
295}
296
297impl ClaudeCodeAdapter {
298 pub fn new(root: impl Into<PathBuf>) -> Self {
299 Self { root: root.into() }
300 }
301}
302
303impl Adapter for ClaudeCodeAdapter {
304 fn discover(&self) -> DiscoverFuture<'_> {
305 jsonl_tree_discover(self)
306 }
307
308 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
309 jsonl_tree_events(self, oracle)
310 }
311}
312
313impl JsonlTree for ClaudeCodeAdapter {
314 type State = FileState;
315
316 fn name(&self) -> &'static str {
317 NAME
318 }
319
320 fn root(&self) -> &Path {
321 &self.root
322 }
323
324 fn peek_session_id(&self, path: &Path, first_line: &str) -> Option<String> {
325 if let Some((parent_uuid, agent_hash)) = subagent_ids(path) {
326 return Some(format!("{parent_uuid}/agent-{agent_hash}"));
327 }
328 let row: Value = serde_json::from_str(first_line).ok()?;
329 row.get("sessionId")?.as_str().map(ToOwned::to_owned)
330 }
331
332 fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
333 session_from_rows(path, rows)
334 }
335
336 fn events_from_row(
337 &self,
338 session: &Session,
339 row: &BoundedRow,
340 state: &mut Self::State,
341 ) -> Result<Vec<IngestEvent>, String> {
342 if let Some(uuid) = row.value.get("uuid").and_then(Value::as_str)
343 && !state.seen_uuids.insert(uuid.to_owned())
344 {
345 return Ok(Vec::new());
346 }
347 capture_tool_call_names(&row.value, &mut state.tool_call_names);
348 events_from_row(&session.id, row.line, &row.value, session.created_at, state)
349 }
350}
351
352fn capture_tool_call_names(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
357 let Some(items) = row
358 .get("message")
359 .and_then(|message| message.get("content"))
360 .and_then(Value::as_array)
361 else {
362 return;
363 };
364 for item in items {
365 let kind = item.get("type").and_then(Value::as_str);
366 if !matches!(kind, Some("tool_use") | Some("server_tool_use")) {
367 continue;
368 }
369 let (Some(id), Some(name)) = (item.str_field("id"), extract_str(item, "name")) else {
370 continue;
371 };
372 map.insert(id.to_owned(), name);
373 }
374}
375
376fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
377 let path_display = path.display().to_string();
378 let mut created_at = None;
379 let mut project: Option<Extracted<String>> = None;
380 let mut version = None;
381 for row in rows {
382 if created_at.is_none() {
383 created_at = parse_timestamp(&row.value).ok();
384 }
385 if project.is_none() {
386 project = extract_str(&row.value, "cwd");
387 }
388 if version.is_none() {
389 version = row
390 .value
391 .get("version")
392 .and_then(Value::as_str)
393 .map(ToOwned::to_owned);
394 }
395 }
396
397 let first = rows
398 .first()
399 .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
400 let at_first = format!("{path_display}:{}", first.line);
401 let raw_session_id = first
402 .value
403 .get("sessionId")
404 .and_then(Value::as_str)
405 .ok_or_else(|| {
406 AdapterError::schema(
407 NAME,
408 at_first.clone(),
409 format!("line {} missing sessionId", first.line),
410 )
411 })?
412 .to_owned();
413 let created_at = created_at.ok_or_else(|| {
414 AdapterError::schema(NAME, at_first, "session has no parseable timestamp")
415 })?;
416
417 let subagent = subagent_descriptor(path);
425 let project_dir = source_project_dir(path, subagent.is_some());
426 let (session_id, parent_session_id, source_agent, subagent_options) = match subagent {
427 Some(SubagentDescriptor {
428 parent_uuid,
429 agent_hash,
430 agent_type,
431 meta,
432 }) => {
433 let child_id = format!("{parent_uuid}/agent-{agent_hash}");
434 let agent_label = agent_type
435 .as_deref()
436 .map(|t| format!("claude-code/{t}"))
437 .unwrap_or_else(|| "claude-code/subagent".to_owned());
438 let metadata = json!({
442 "hash": agent_hash,
443 "raw_session_id": raw_session_id,
444 "meta": meta,
445 });
446 (child_id, Some(parent_uuid), agent_label, Some(metadata))
447 }
448 None => (raw_session_id, None, "claude-code".to_owned(), None),
449 };
450
451 let project = match project {
452 Some(value) => value,
453 None => {
454 let decoded = path
455 .parent()
456 .and_then(|p| p.file_name())
457 .and_then(|n| n.to_str())
458 .map(|s| s.replace('-', "/"))
459 .ok_or_else(|| {
460 AdapterError::schema(
461 NAME,
462 path_display.clone(),
463 "no `cwd` field in any row and source path is not UTF-8",
464 )
465 })?;
466 extract_self_str(&Value::String(decoded)).ok_or_else(|| {
467 AdapterError::schema(
468 NAME,
469 path_display.clone(),
470 "internal: Value::String produced None from Source::as_str",
471 )
472 })?
473 }
474 };
475
476 let mut options = ProviderOptions::new();
477 options.insert(
478 "source".to_owned(),
479 json!({
480 "adapter": "claude-code",
481 "version": version,
482 "project_dir": project_dir,
483 "workspace_path": &*project,
484 }),
485 );
486 if let Some(metadata) = subagent_options {
487 options.insert("subagent".to_owned(), metadata);
488 }
489
490 Ok(Session {
491 id: session_id,
492 parent_session_id,
493 parent_message_id: None,
494 source_agent,
495 created_at,
496 project,
497 options,
498 })
499}
500
501fn source_project_dir(path: &Path, is_subagent: bool) -> Option<String> {
502 let project_dir = if is_subagent {
503 path.parent().and_then(Path::parent).and_then(Path::parent)
504 } else {
505 path.parent()
506 };
507 project_dir
508 .and_then(|p| p.file_name())
509 .and_then(|n| n.to_str())
510 .map(ToOwned::to_owned)
511}
512
513struct SubagentDescriptor {
519 parent_uuid: String,
520 agent_hash: String,
521 agent_type: Option<String>,
522 meta: Option<Value>,
523}
524
525fn subagent_ids(path: &Path) -> Option<(String, String)> {
529 let file_name = path.file_name()?.to_str()?;
530 let agent_hash = file_name
531 .strip_prefix("agent-")?
532 .strip_suffix(".jsonl")?
533 .to_owned();
534 let subagents_dir = path.parent()?;
535 if subagents_dir.file_name()?.to_str()? != "subagents" {
536 return None;
537 }
538 let parent_uuid = subagents_dir.parent()?.file_name()?.to_str()?.to_owned();
539 Some((parent_uuid, agent_hash))
540}
541
542fn subagent_descriptor(path: &Path) -> Option<SubagentDescriptor> {
545 let (parent_uuid, agent_hash) = subagent_ids(path)?;
546 let meta_path = path.parent()?.join(format!("agent-{agent_hash}.meta.json"));
547 let (agent_type, meta) = match std::fs::read(&meta_path) {
548 Ok(bytes) => match serde_json::from_slice::<Value>(&bytes) {
549 Ok(value) => (
550 value
551 .get("agentType")
552 .and_then(Value::as_str)
553 .map(ToOwned::to_owned),
554 Some(value),
555 ),
556 Err(error) => {
557 tracing::debug!(
558 target: "pond::adapter::claude_code",
559 meta = %meta_path.display(),
560 %error,
561 "subagent .meta.json present but unparseable; falling back to 'claude-code/subagent'",
562 );
563 (None, None)
564 }
565 },
566 Err(error) if error.kind() == std::io::ErrorKind::NotFound => (None, None),
567 Err(error) => {
568 tracing::debug!(
569 target: "pond::adapter::claude_code",
570 meta = %meta_path.display(),
571 %error,
572 "subagent .meta.json IO error; falling back to 'claude-code/subagent'",
573 );
574 (None, None)
575 }
576 };
577
578 Some(SubagentDescriptor {
579 parent_uuid,
580 agent_hash,
581 agent_type,
582 meta,
583 })
584}
585
586fn events_from_row(
587 session_id: &str,
588 line: usize,
589 row: &Value,
590 default_timestamp: DateTime<Utc>,
591 state: &FileState,
592) -> Result<Vec<IngestEvent>, String> {
593 let timestamp = parse_timestamp(row).unwrap_or(default_timestamp);
594 let uuid = row
595 .get("uuid")
596 .and_then(Value::as_str)
597 .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
598
599 if let Some(message_value) = row.get("message") {
600 return message_events(
601 session_id,
602 &uuid,
603 timestamp,
604 row,
605 message_value,
606 state,
607 line,
608 );
609 }
610
611 let raw_type = row.get("type").and_then(Value::as_str);
618 let content = if raw_type == Some("attachment") {
619 row.get("attachment")
620 .and_then(attachment_content)
621 .or_else(|| Some(extract_compact_repr(row)))
622 } else {
623 extract_str(row, "subtype").or_else(|| extract_str(row, "type"))
624 };
625 let message = Message::System {
626 id: uuid,
627 session_id: session_id.to_owned(),
628 timestamp,
629 content,
630 options: row_options(row, line),
631 };
632 Ok(vec![IngestEvent::Message(message)])
633}
634
635fn message_events(
636 session_id: &str,
637 uuid: &str,
638 timestamp: DateTime<Utc>,
639 row: &Value,
640 message_value: &Value,
641 state: &FileState,
642 line: usize,
643) -> Result<Vec<IngestEvent>, String> {
644 let role = message_value
645 .get("role")
646 .and_then(Value::as_str)
647 .ok_or_else(|| "message missing role".to_owned())?;
648 let content = message_value.get("content").unwrap_or(&Value::Null);
649 let mut parts = Vec::new();
650 let message = match (role, content) {
651 ("user", Value::String(text)) => {
652 let provenance = user_text_provenance(row, text);
656 parts.push(text_part(
657 session_id,
658 uuid,
659 0,
660 extract_self_str(content),
661 provenance,
662 ));
663 Message::User {
664 id: uuid.to_owned(),
665 session_id: session_id.to_owned(),
666 timestamp,
667 options: row_options(row, line),
668 }
669 }
670 ("user", Value::Array(items)) if items.iter().all(is_tool_result) => {
671 let source_tool_result = row.get("toolUseResult").cloned();
672 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
673 tool_result_part(
674 session_id,
675 uuid,
676 ordinal,
677 item,
678 source_tool_result.as_ref(),
679 state,
680 )
681 }));
682 Message::Tool {
683 id: uuid.to_owned(),
684 session_id: session_id.to_owned(),
685 timestamp,
686 options: row_options(row, line),
687 }
688 }
689 ("user", Value::Array(items)) => {
690 let provenance = user_array_provenance(row, items);
693 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
694 user_part(session_id, uuid, ordinal, item, state, provenance)
695 }));
696 Message::User {
697 id: uuid.to_owned(),
698 session_id: session_id.to_owned(),
699 timestamp,
700 options: row_options(row, line),
701 }
702 }
703 ("assistant", Value::Array(items)) => {
704 parts.extend(
705 items
706 .iter()
707 .enumerate()
708 .map(|(ordinal, item)| assistant_part(session_id, uuid, ordinal, item)),
709 );
710 Message::Assistant {
711 id: uuid.to_owned(),
712 session_id: session_id.to_owned(),
713 timestamp,
714 options: assistant_options(row, message_value, line),
715 }
716 }
717 ("system", Value::String(_)) => Message::System {
718 id: uuid.to_owned(),
719 session_id: session_id.to_owned(),
720 timestamp,
721 content: extract_self_str(content),
722 options: row_options(row, line),
723 },
724 ("system", _) => Message::System {
725 id: uuid.to_owned(),
726 session_id: session_id.to_owned(),
727 timestamp,
728 content: Some(extract_compact_repr(message_value)),
733 options: row_options(row, line),
734 },
735 (other, _) => {
736 return Err(format!("unsupported message role {other}"));
737 }
738 };
739
740 let mut events = Vec::with_capacity(parts.len() + 1);
741 events.push(IngestEvent::Message(message));
742 events.extend(parts.into_iter().map(IngestEvent::Part));
743 Ok(events)
744}
745
746fn text_part(
747 session_id: &str,
748 message_id: &str,
749 ordinal: usize,
750 text: Option<Extracted<String>>,
751 provenance: Provenance,
752) -> Part {
753 Part {
754 session_id: session_id.to_owned(),
755 id: part_id(message_id, ordinal),
756 message_id: message_id.to_owned(),
757 ordinal: part_ordinal(ordinal),
758 provenance,
759 options: empty_options(),
760 kind: PartKind::Text { text },
761 }
762}
763
764fn user_part(
765 session_id: &str,
766 message_id: &str,
767 ordinal: usize,
768 value: &Value,
769 state: &FileState,
770 provenance: Provenance,
771) -> Part {
772 match value.get("type").and_then(Value::as_str) {
773 Some("text") => text_part(
774 session_id,
775 message_id,
776 ordinal,
777 extract_str(value, "text"),
778 provenance,
779 ),
780 Some("image") | Some("file") => {
781 file_part(session_id, message_id, ordinal, value, provenance)
782 }
783 Some("tool_result") => {
784 tool_result_part(session_id, message_id, ordinal, value, None, state)
785 }
786 _ => text_part(
790 session_id,
791 message_id,
792 ordinal,
793 Some(extract_compact_repr(value)),
794 provenance,
795 ),
796 }
797}
798
799fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, value: &Value) -> Part {
800 match value.get("type").and_then(Value::as_str) {
804 Some("text") => text_part(
805 session_id,
806 message_id,
807 ordinal,
808 extract_str(value, "text"),
809 Provenance::Conversational,
810 ),
811 Some("thinking") => Part {
812 session_id: session_id.to_owned(),
813 id: part_id(message_id, ordinal),
814 message_id: message_id.to_owned(),
815 ordinal: part_ordinal(ordinal),
816 provenance: Provenance::Conversational,
817 options: signature_options(value),
818 kind: PartKind::Reasoning {
819 text: extract_str(value, "thinking"),
820 },
821 },
822 Some("tool_use") => Part {
823 session_id: session_id.to_owned(),
824 id: part_id(message_id, ordinal),
825 message_id: message_id.to_owned(),
826 ordinal: part_ordinal(ordinal),
827 provenance: Provenance::Conversational,
828 options: empty_options(),
829 kind: PartKind::ToolCall {
830 call_id: extract_str(value, "id"),
831 name: extract_str(value, "name"),
832 params: value.get("input").cloned().unwrap_or(Value::Null),
833 provider_executed: false,
834 },
835 },
836 Some("server_tool_use") => Part {
837 session_id: session_id.to_owned(),
838 id: part_id(message_id, ordinal),
839 message_id: message_id.to_owned(),
840 ordinal: part_ordinal(ordinal),
841 provenance: Provenance::Conversational,
842 options: empty_options(),
843 kind: PartKind::ToolCall {
844 call_id: extract_str(value, "id"),
845 name: extract_str(value, "name"),
846 params: value.get("input").cloned().unwrap_or(Value::Null),
847 provider_executed: true,
848 },
849 },
850 Some("image") | Some("file") => file_part(
851 session_id,
852 message_id,
853 ordinal,
854 value,
855 Provenance::Conversational,
856 ),
857 _ => text_part(
860 session_id,
861 message_id,
862 ordinal,
863 Some(extract_compact_repr(value)),
864 Provenance::Conversational,
865 ),
866 }
867}
868
869fn tool_result_part(
870 session_id: &str,
871 message_id: &str,
872 ordinal: usize,
873 value: &Value,
874 source_tool_result: Option<&Value>,
875 state: &FileState,
876) -> Part {
877 let call_id = extract_str(value, "tool_use_id");
878 let name = value
884 .str_field("tool_use_id")
885 .and_then(|id| state.tool_call_names.get(id))
886 .cloned();
887 let result = value
888 .get("content")
889 .cloned()
890 .or_else(|| source_tool_result.cloned())
891 .unwrap_or(Value::Null);
892 Part {
893 session_id: session_id.to_owned(),
894 id: part_id(message_id, ordinal),
895 message_id: message_id.to_owned(),
896 ordinal: part_ordinal(ordinal),
897 provenance: Provenance::Injected,
900 options: empty_options(),
901 kind: PartKind::ToolResult {
902 call_id,
903 name,
904 is_failure: value
905 .get("is_error")
906 .and_then(Value::as_bool)
907 .unwrap_or(false),
908 result,
909 },
910 }
911}
912
913fn file_part(
914 session_id: &str,
915 message_id: &str,
916 ordinal: usize,
917 value: &Value,
918 provenance: Provenance,
919) -> Part {
920 let media_type = value
921 .get("media_type")
922 .or_else(|| value.get("mime_type"))
923 .and_then(Value::as_str)
924 .map(ToOwned::to_owned);
925 let file_name = value
926 .get("file_name")
927 .or_else(|| value.get("name"))
928 .and_then(Value::as_str)
929 .map(ToOwned::to_owned);
930 let data = if let Some(source) = value.get("source") {
931 if let Some(url) = source.get("url").and_then(Value::as_str) {
932 FileData::Url(url.to_owned())
933 } else if let Some(bytes) = source.get("data").and_then(Value::as_str) {
934 FileData::String(bytes.to_owned())
935 } else {
936 FileData::String(compact_json(source))
937 }
938 } else if let Some(url) = value.get("url").and_then(Value::as_str) {
939 FileData::Url(url.to_owned())
940 } else {
941 FileData::String(compact_json(value))
942 };
943
944 Part {
945 session_id: session_id.to_owned(),
946 id: part_id(message_id, ordinal),
947 message_id: message_id.to_owned(),
948 ordinal: part_ordinal(ordinal),
949 provenance,
950 options: empty_options(),
951 kind: PartKind::File {
952 media_type,
953 file_name,
954 data,
955 },
956 }
957}
958
959fn row_options(row: &Value, line: usize) -> ProviderOptions {
960 let mut options = ProviderOptions::new();
961 let source = json!({
962 "line": line,
963 "parent_uuid": row.get("parentUuid"),
964 "is_sidechain": row.get("isSidechain"),
965 "user_type": row.get("userType"),
966 "entrypoint": row.get("entrypoint"),
967 "cwd": row.get("cwd"),
968 "version": row.get("version"),
969 "git_branch": row.get("gitBranch"),
970 "request_id": row.get("requestId"),
971 "raw_type": row.get("type"),
972 "raw_record": extract_raw_record(row),
973 });
974 options.insert("source".to_owned(), source);
975 options
976}
977
978fn assistant_options(row: &Value, message_value: &Value, line: usize) -> ProviderOptions {
979 let mut options = row_options(row, line);
980 let anthropic = json!({
981 "id": message_value.get("id"),
982 "model": message_value.get("model"),
983 "stop_reason": message_value.get("stop_reason"),
984 "stop_sequence": message_value.get("stop_sequence"),
985 "usage": message_value.get("usage"),
986 });
987 options.insert("anthropic".to_owned(), anthropic);
988 options
989}
990
991fn signature_options(value: &Value) -> ProviderOptions {
992 let mut options = ProviderOptions::new();
993 if let Some(signature) = value.get("signature").and_then(Value::as_str) {
994 options.insert("anthropic".to_owned(), json!({"signature": signature}));
995 }
996 options
997}
998
999fn attachment_content(value: &Value) -> Option<Extracted<String>> {
1000 extract_str(value, "content").or_else(|| extract_str(value, "stdout"))
1001}
1002
1003fn parse_timestamp(value: &Value) -> anyhow::Result<DateTime<Utc>> {
1004 let timestamp = value
1005 .get("timestamp")
1006 .and_then(Value::as_str)
1007 .context("missing timestamp")?;
1008 Ok(DateTime::parse_from_rfc3339(timestamp)
1009 .context("invalid timestamp")?
1010 .with_timezone(&Utc))
1011}
1012
1013fn is_tool_result(value: &Value) -> bool {
1014 value.get("type").and_then(Value::as_str) == Some("tool_result")
1015}
1016
1017fn is_meta_row(row: &Value) -> bool {
1020 row.get("isMeta").and_then(Value::as_bool) == Some(true)
1021}
1022
1023fn is_injected_user_text(text: &str) -> bool {
1027 let trimmed = text.trim_start();
1028 trimmed.starts_with("<task-notification>")
1029 || trimmed.starts_with("<command-name>")
1030 || trimmed.starts_with("<command-message>")
1031 || trimmed.starts_with("<command-args>")
1032 || trimmed.starts_with("<local-command-caveat>")
1033 || trimmed.starts_with("<local-command-stdout>")
1034 || trimmed.starts_with("[Request interrupted by user")
1035}
1036
1037fn user_text_provenance(row: &Value, text: &str) -> Provenance {
1040 if is_meta_row(row) || is_injected_user_text(text) {
1041 Provenance::Injected
1042 } else {
1043 Provenance::Conversational
1044 }
1045}
1046
1047fn user_array_provenance(row: &Value, items: &[Value]) -> Provenance {
1051 if is_meta_row(row) {
1052 return Provenance::Injected;
1053 }
1054 let wrapped = items.iter().any(|item| {
1055 item.get("type").and_then(Value::as_str) == Some("text")
1056 && item
1057 .get("text")
1058 .and_then(Value::as_str)
1059 .is_some_and(is_injected_user_text)
1060 });
1061 if wrapped {
1062 Provenance::Injected
1063 } else {
1064 Provenance::Conversational
1065 }
1066}
1067
1068#[cfg(test)]
1069mod tests {
1070 #![allow(clippy::expect_used, clippy::unwrap_used)]
1078
1079 use super::*;
1080 use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
1081 use tempfile::TempDir;
1082
1083 const FIXTURE_ROOT: &str = "tests/fixtures/adapter/claude_code/projects";
1084
1085 #[test]
1086 fn probe_default_finds_claude_projects_under_home() -> anyhow::Result<()> {
1087 crate::adapter::test_support::assert_probe_default(
1088 &ClaudeCodeFactory,
1089 &[".claude", "projects"],
1090 )
1091 }
1092
1093 #[tokio::test(flavor = "multi_thread")]
1094 async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
1095 let adapter = ClaudeCodeAdapter::new(FIXTURE_ROOT);
1096 crate::adapter::test_support::assert_native_restore(
1097 &ClaudeCodeFactory,
1098 &adapter,
1099 std::path::Path::new(FIXTURE_ROOT),
1100 )
1101 .await
1102 }
1103
1104 #[tokio::test(flavor = "multi_thread")]
1112 async fn subagent_file_derives_child_session_with_parent_link() -> anyhow::Result<()> {
1113 let corpus = TempDir::new()?;
1114 let project_dir = corpus.path().join("-tmp-pond-test");
1115 let parent_uuid = "11111111-1111-1111-1111-111111111111";
1116 let agent_hash = "abc123def456";
1117 std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1118
1119 let parent_row = serde_json::json!({
1121 "type": "user",
1122 "uuid": "u-parent-1",
1123 "sessionId": parent_uuid,
1124 "cwd": "/tmp/pond-test",
1125 "timestamp": "2026-05-16T00:00:00.000Z",
1126 "version": "2.1.121",
1127 "message": {"role": "user", "content": "hi parent"},
1128 });
1129 std::fs::write(
1130 project_dir.join(format!("{parent_uuid}.jsonl")),
1131 format!("{parent_row}\n"),
1132 )?;
1133
1134 let subagent_row = serde_json::json!({
1137 "type": "user",
1138 "uuid": "u-sub-1",
1139 "sessionId": parent_uuid,
1140 "cwd": "/tmp/pond-test",
1141 "isSidechain": true,
1142 "agentId": agent_hash,
1143 "timestamp": "2026-05-16T00:01:00.000Z",
1144 "version": "2.1.121",
1145 "message": {"role": "user", "content": "subagent prompt"},
1146 });
1147 std::fs::write(
1148 project_dir
1149 .join(parent_uuid)
1150 .join("subagents")
1151 .join(format!("agent-{agent_hash}.jsonl")),
1152 format!("{subagent_row}\n"),
1153 )?;
1154 std::fs::write(
1155 project_dir
1156 .join(parent_uuid)
1157 .join("subagents")
1158 .join(format!("agent-{agent_hash}.meta.json")),
1159 r#"{"agentType":"general-purpose","description":"do a thing"}"#,
1160 )?;
1161
1162 let store_dir = TempDir::new()?;
1163 let store = Store::open_local(store_dir.path()).await?;
1164 let adapter = ClaudeCodeAdapter::new(corpus.path());
1165
1166 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1167 assert_eq!(
1168 summary.dropped_sessions, 0,
1169 "subagent file must NOT collide with parent (pre-fix this was the project-immutable rejection)"
1170 );
1171
1172 let parent = store
1173 .get_session(parent_uuid)
1174 .await?
1175 .expect("parent session should ingest as the bare uuid");
1176 assert_eq!(parent.session.source_agent, "claude-code");
1177 assert_eq!(parent.session.parent_session_id, None);
1178
1179 let child_id = format!("{parent_uuid}/agent-{agent_hash}");
1180 let child = store
1181 .get_session(&child_id)
1182 .await?
1183 .expect("subagent session must surface under the derived id");
1184 assert_eq!(
1185 child.session.source_agent, "claude-code/general-purpose",
1186 "agent_type from .meta.json should suffix the source_agent label"
1187 );
1188 assert_eq!(
1189 child.session.parent_session_id.as_deref(),
1190 Some(parent_uuid),
1191 "subagent must link back to parent via parent_session_id",
1192 );
1193 let subagent_meta = child
1194 .session
1195 .options
1196 .get("subagent")
1197 .expect("options.subagent must carry the hash + verbatim meta.json");
1198 assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1199 assert_eq!(
1200 subagent_meta["meta"]["agentType"],
1201 serde_json::json!("general-purpose")
1202 );
1203 assert_eq!(
1204 subagent_meta["meta"]["description"],
1205 serde_json::json!("do a thing")
1206 );
1207 Ok(())
1208 }
1209
1210 #[tokio::test(flavor = "multi_thread")]
1214 async fn subagent_without_meta_falls_back_to_generic_label() -> anyhow::Result<()> {
1215 let corpus = TempDir::new()?;
1216 let project_dir = corpus.path().join("-tmp-pond-test");
1217 let parent_uuid = "22222222-2222-2222-2222-222222222222";
1218 let agent_hash = "deadbeef";
1219 std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1220 let row = serde_json::json!({
1221 "type": "user",
1222 "uuid": "u-sub-only",
1223 "sessionId": parent_uuid,
1224 "cwd": "/tmp/pond-test",
1225 "timestamp": "2026-05-16T00:00:00.000Z",
1226 "message": {"role": "user", "content": "no meta sibling here"},
1227 });
1228 std::fs::write(
1229 project_dir
1230 .join(parent_uuid)
1231 .join("subagents")
1232 .join(format!("agent-{agent_hash}.jsonl")),
1233 format!("{row}\n"),
1234 )?;
1235
1236 let store_dir = TempDir::new()?;
1237 let store = Store::open_local(store_dir.path()).await?;
1238 let adapter = ClaudeCodeAdapter::new(corpus.path());
1239 let _summary =
1240 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1241
1242 let child = store
1243 .get_session(&format!("{parent_uuid}/agent-{agent_hash}"))
1244 .await?
1245 .expect("derived child id even without meta");
1246 assert_eq!(child.session.source_agent, "claude-code/subagent");
1247 Ok(())
1248 }
1249
1250 #[tokio::test(flavor = "multi_thread")]
1255 async fn replay_duplicates_are_dedup_at_adapter_layer() -> anyhow::Result<()> {
1256 let corpus = TempDir::new()?;
1257 let project_dir = corpus.path().join("-tmp-pond-test");
1258 std::fs::create_dir_all(&project_dir)?;
1259 let session_uuid = "33333333-3333-3333-3333-333333333333";
1260 let dup_uuid = "u-shared-1";
1261 let row = serde_json::json!({
1262 "type": "user",
1263 "uuid": dup_uuid,
1264 "sessionId": session_uuid,
1265 "cwd": "/tmp/pond-test",
1266 "timestamp": "2026-05-16T00:00:00.000Z",
1267 "message": {"role": "user", "content": "replayed three times"},
1268 });
1269 let body = format!("{row}\n{row}\n{row}\n");
1271 std::fs::write(project_dir.join(format!("{session_uuid}.jsonl")), body)?;
1272
1273 let store_dir = TempDir::new()?;
1274 let store = Store::open_local(store_dir.path()).await?;
1275 let adapter = ClaudeCodeAdapter::new(corpus.path());
1276 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1277
1278 assert_eq!(
1279 summary.dropped_events, 0,
1280 "adapter must dedupe replays before they reach the validator"
1281 );
1282 assert!(
1283 !summary
1284 .drop_reasons
1285 .contains_key(crate::sessions::DROP_REASON_DUPLICATE_MESSAGE_ID),
1286 "duplicate_message_id bucket stays empty when adapter does its job"
1287 );
1288 Ok(())
1289 }
1290
1291 #[tokio::test(flavor = "multi_thread")]
1295 async fn tool_result_name_resolves_from_prior_tool_use_in_same_file() -> anyhow::Result<()> {
1296 let corpus = TempDir::new()?;
1297 let project_dir = corpus.path().join("-tmp-pond-test");
1298 std::fs::create_dir_all(&project_dir)?;
1299 let session_uuid = "44444444-4444-4444-4444-444444444444";
1300 let call_id = "toolu_test_01";
1301
1302 let tool_use_row = serde_json::json!({
1303 "type": "assistant",
1304 "uuid": "u-call",
1305 "sessionId": session_uuid,
1306 "cwd": "/tmp/pond-test",
1307 "timestamp": "2026-05-16T00:00:00.000Z",
1308 "message": {
1309 "role": "assistant",
1310 "content": [{
1311 "type": "tool_use",
1312 "id": call_id,
1313 "name": "Edit",
1314 "input": {"file_path": "/tmp/foo"},
1315 }],
1316 },
1317 });
1318 let tool_result_row = serde_json::json!({
1319 "type": "user",
1320 "uuid": "u-result",
1321 "sessionId": session_uuid,
1322 "cwd": "/tmp/pond-test",
1323 "timestamp": "2026-05-16T00:00:01.000Z",
1324 "message": {
1325 "role": "user",
1326 "content": [{
1327 "type": "tool_result",
1328 "tool_use_id": call_id,
1329 "content": "ok",
1330 }],
1331 },
1332 });
1333 std::fs::write(
1334 project_dir.join(format!("{session_uuid}.jsonl")),
1335 format!("{tool_use_row}\n{tool_result_row}\n"),
1336 )?;
1337
1338 let store_dir = TempDir::new()?;
1339 let store = Store::open_local(store_dir.path()).await?;
1340 let adapter = ClaudeCodeAdapter::new(corpus.path());
1341 let _summary =
1342 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1343 let session = store
1344 .get_session(session_uuid)
1345 .await?
1346 .expect("session ingests");
1347
1348 let mut saw_call = false;
1349 let mut saw_result = false;
1350 for stored in &session.messages {
1351 for part in &stored.parts {
1352 match &part.kind {
1353 PartKind::ToolCall {
1354 call_id: cid, name, ..
1355 } => {
1356 assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1357 assert_eq!(
1358 name.as_ref().map(|e| e.as_str()),
1359 Some("Edit"),
1360 "tool_use carries the name directly"
1361 );
1362 saw_call = true;
1363 }
1364 PartKind::ToolResult {
1365 call_id: cid, name, ..
1366 } => {
1367 assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1368 assert_eq!(
1369 name.as_ref().map(|e| e.as_str()),
1370 Some("Edit"),
1371 "tool_result resolves the name via the per-file map (was 'unknown' pre-2026-05-16)"
1372 );
1373 saw_result = true;
1374 }
1375 _ => {}
1376 }
1377 }
1378 }
1379 assert!(saw_call && saw_result, "both parts must be present");
1380 Ok(())
1381 }
1382
1383 #[test]
1387 fn user_text_provenance_separates_prompts_from_harness_injection() {
1388 let prompt = json!({"type": "user", "uuid": "u1"});
1389 assert_eq!(
1390 user_text_provenance(&prompt, "please refactor the parser"),
1391 Provenance::Conversational,
1392 );
1393
1394 let notification = json!({"type": "user", "uuid": "u2"});
1395 assert_eq!(
1396 user_text_provenance(
1397 ¬ification,
1398 "<task-notification>background task done</task-notification>",
1399 ),
1400 Provenance::Injected,
1401 );
1402
1403 let meta = json!({"type": "user", "uuid": "u3", "isMeta": true});
1404 assert_eq!(
1405 user_text_provenance(&meta, "expanded skill body"),
1406 Provenance::Injected,
1407 );
1408 }
1409
1410 #[tokio::test(flavor = "multi_thread")]
1414 async fn task_notification_message_yields_injected_parts() -> anyhow::Result<()> {
1415 let corpus = TempDir::new()?;
1416 let project_dir = corpus.path().join("-tmp-pond-test");
1417 std::fs::create_dir_all(&project_dir)?;
1418 let session_uuid = "66666666-6666-6666-6666-666666666666";
1419 let prompt = serde_json::json!({
1420 "type": "user",
1421 "uuid": "u-prompt",
1422 "sessionId": session_uuid,
1423 "cwd": "/tmp/pond-test",
1424 "timestamp": "2026-05-16T00:00:00.000Z",
1425 "message": {"role": "user", "content": "genuine human prompt"},
1426 });
1427 let notification = serde_json::json!({
1428 "type": "user",
1429 "uuid": "u-notify",
1430 "sessionId": session_uuid,
1431 "cwd": "/tmp/pond-test",
1432 "timestamp": "2026-05-16T00:00:01.000Z",
1433 "message": {
1434 "role": "user",
1435 "content": "<task-notification>a background task finished</task-notification>",
1436 },
1437 });
1438 std::fs::write(
1439 project_dir.join(format!("{session_uuid}.jsonl")),
1440 format!("{prompt}\n{notification}\n"),
1441 )?;
1442
1443 let store_dir = TempDir::new()?;
1444 let store = Store::open_local(store_dir.path()).await?;
1445 let adapter = ClaudeCodeAdapter::new(corpus.path());
1446 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1447
1448 let session = store
1449 .get_session(session_uuid)
1450 .await?
1451 .expect("session ingests");
1452 let mut saw_prompt = false;
1453 let mut saw_notification = false;
1454 for stored in &session.messages {
1455 for part in &stored.parts {
1456 if stored.message.id() == "u-prompt" {
1457 assert_eq!(part.provenance, crate::wire::Provenance::Conversational);
1458 saw_prompt = true;
1459 }
1460 if stored.message.id() == "u-notify" {
1461 assert_eq!(part.provenance, crate::wire::Provenance::Injected);
1462 saw_notification = true;
1463 }
1464 }
1465 }
1466 assert!(saw_prompt && saw_notification, "both messages present");
1467 Ok(())
1468 }
1469
1470 #[tokio::test(flavor = "multi_thread")]
1474 async fn orphan_tool_result_yields_name_none_not_unknown_sentinel() -> anyhow::Result<()> {
1475 let corpus = TempDir::new()?;
1476 let project_dir = corpus.path().join("-tmp-pond-test");
1477 std::fs::create_dir_all(&project_dir)?;
1478 let session_uuid = "55555555-5555-5555-5555-555555555555";
1479
1480 let row = serde_json::json!({
1482 "type": "user",
1483 "uuid": "u-orphan",
1484 "sessionId": session_uuid,
1485 "cwd": "/tmp/pond-test",
1486 "timestamp": "2026-05-16T00:00:00.000Z",
1487 "message": {
1488 "role": "user",
1489 "content": [{
1490 "type": "tool_result",
1491 "tool_use_id": "toolu_orphan",
1492 "content": "result body, no matching call",
1493 }],
1494 },
1495 });
1496 std::fs::write(
1497 project_dir.join(format!("{session_uuid}.jsonl")),
1498 format!("{row}\n"),
1499 )?;
1500
1501 let store_dir = TempDir::new()?;
1502 let store = Store::open_local(store_dir.path()).await?;
1503 let adapter = ClaudeCodeAdapter::new(corpus.path());
1504 let _summary =
1505 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1506 let session = store
1507 .get_session(session_uuid)
1508 .await?
1509 .expect("session ingests");
1510 let mut found = false;
1511 for stored in &session.messages {
1512 for part in &stored.parts {
1513 if let PartKind::ToolResult { name, call_id, .. } = &part.kind {
1514 assert_eq!(call_id.as_ref().map(|e| e.as_str()), Some("toolu_orphan"));
1515 assert!(
1516 name.is_none(),
1517 "orphan tool_result must be name=None, not synthesized 'unknown'",
1518 );
1519 found = true;
1520 }
1521 }
1522 }
1523 assert!(found, "orphan tool_result part must be present");
1524 Ok(())
1526 }
1527}