1use std::path::{Path, PathBuf};
27
28use async_stream::stream;
29use chrono::{DateTime, Utc};
30use serde_json::{Value, json};
31
32use crate::{
33 sessions::IngestEvent,
34 wire::{Message, Part, PartKind, Provenance, ProviderOptions, Session},
35};
36
37use super::{
38 Adapter, AdapterError, AdapterFactory, AdapterYield, AdapterYieldStream, DiscoverFuture, Env,
39 RestoreFidelity, RestoredFile, SkipOracle, SkipReason, compact_json, config_path,
40 empty_options,
41 extract::{bound_value, extract_compact_repr, extract_str},
42 extracted_text, part_id, part_ordinal, raw_record, source_options,
43};
44
45const NAME: &str = "claude-ai-export";
46
47const CONVERSATIONS_ENTRY: &str = "conversations.json";
50
51pub struct ClaudeAiExportFactory;
55
56impl AdapterFactory for ClaudeAiExportFactory {
57 fn name(&self) -> &'static str {
58 NAME
59 }
60
61 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
62 Ok(Box::new(ClaudeAiExportAdapter::new(config_path(
63 NAME, config,
64 )?)))
65 }
66
67 fn probe_default(&self, _env: &Env) -> Option<Value> {
68 None
71 }
72
73 fn serialize(
74 &self,
75 session: &crate::sessions::SessionWithMessages,
76 fidelity: RestoreFidelity,
77 ) -> Result<Vec<RestoredFile>, AdapterError> {
78 serialize_session(session, fidelity)
79 }
80}
81
82#[derive(Debug, Clone)]
85pub struct ClaudeAiExportAdapter {
86 path: PathBuf,
87}
88
89impl ClaudeAiExportAdapter {
90 pub fn new(path: impl Into<PathBuf>) -> Self {
91 Self { path: path.into() }
92 }
93}
94
95impl Adapter for ClaudeAiExportAdapter {
96 fn discover(&self) -> DiscoverFuture<'_> {
97 let path = self.path.clone();
98 Box::pin(async move {
99 tokio::task::spawn_blocking(move || {
100 read_conversations(&path).map(|conversations| {
101 conversations
102 .iter()
103 .filter(|conv| {
106 conv.get("uuid").and_then(Value::as_str).is_some()
107 && !messages_of(conv).is_empty()
108 })
109 .count()
110 })
111 })
112 .await
113 .map_err(join_error)?
114 })
115 }
116
117 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
118 let path = self.path.clone();
119 Box::pin(stream! {
120 let parsed = tokio::task::spawn_blocking(move || read_conversations(&path)).await;
124 let conversations = match parsed {
125 Ok(Ok(conversations)) => conversations,
126 Ok(Err(error)) => { yield Err(error); return; }
127 Err(join) => { yield Err(join_error(join)); return; }
128 };
129
130 for mut conv in conversations {
131 bound_value(&mut conv);
132 let Some(session_id) = conv.get("uuid").and_then(Value::as_str).map(ToOwned::to_owned)
133 else {
134 yield Err(AdapterError::schema(
135 NAME,
136 CONVERSATIONS_ENTRY,
137 "conversation missing `uuid`",
138 ));
139 continue;
140 };
141 if messages_of(&conv).is_empty() {
142 yield Ok(AdapterYield::Skipped {
144 session_id: Some(session_id),
145 project: conv
146 .get("account")
147 .and_then(|account| account.get("uuid"))
148 .and_then(Value::as_str)
149 .map(ToOwned::to_owned),
150 reason: SkipReason::Empty,
151 });
152 continue;
153 }
154 if let Some(ingested) = oracle.last_ingested_at(&session_id)
157 && let Some(updated) = rfc3339(&conv, "updated_at")
158 && updated <= ingested
159 {
160 yield Ok(AdapterYield::Skipped {
161 session_id: Some(session_id),
162 project: None,
163 reason: SkipReason::Fresh,
164 });
165 continue;
166 }
167
168 let session = match build_session(&conv, &session_id) {
169 Ok(session) => session,
170 Err(error) => { yield Err(error); continue; }
171 };
172 let created_at = session.created_at;
173 yield Ok(AdapterYield::Event(IngestEvent::Session(session)));
174
175 for (index, message) in messages_of(&conv).iter().enumerate() {
176 for event in message_events(&session_id, message, index, created_at) {
177 yield Ok(AdapterYield::Event(event));
178 }
179 }
180 }
181 })
182 }
183}
184
185fn join_error(join: tokio::task::JoinError) -> AdapterError {
187 AdapterError::io(
188 NAME,
189 "blocking read task",
190 std::io::Error::other(join.to_string()),
191 )
192}
193
194fn messages_of(conv: &Value) -> &[Value] {
195 conv.get("chat_messages")
196 .and_then(Value::as_array)
197 .map(Vec::as_slice)
198 .unwrap_or(&[])
199}
200
201fn read_conversations(path: &Path) -> Result<Vec<Value>, AdapterError> {
204 use std::io::Read;
205 let io = |location: String, source| AdapterError::io(NAME, location, source);
206
207 let bytes = if path.is_dir() {
208 let file = path.join(CONVERSATIONS_ENTRY);
209 std::fs::read(&file).map_err(|error| io(file.display().to_string(), error))?
210 } else if path.extension().and_then(|ext| ext.to_str()) == Some("zip") {
211 let file =
212 std::fs::File::open(path).map_err(|error| io(path.display().to_string(), error))?;
213 let mut archive = zip::ZipArchive::new(file).map_err(|error| {
214 AdapterError::schema(
215 NAME,
216 path.display().to_string(),
217 format!("bad zip: {error}"),
218 )
219 })?;
220 let mut entry = archive.by_name(CONVERSATIONS_ENTRY).map_err(|error| {
221 AdapterError::schema(
222 NAME,
223 path.display().to_string(),
224 format!("export zip has no `{CONVERSATIONS_ENTRY}`: {error}"),
225 )
226 })?;
227 let hint = entry.size().min(64 * 1024 * 1024) as usize;
231 let mut buf = Vec::with_capacity(hint);
232 entry
233 .read_to_end(&mut buf)
234 .map_err(|error| io(path.display().to_string(), error))?;
235 buf
236 } else {
237 std::fs::read(path).map_err(|error| io(path.display().to_string(), error))?
238 };
239
240 let value: Value = serde_json::from_slice(&bytes)
241 .map_err(|error| AdapterError::parse(NAME, path.display().to_string(), 1, error))?;
242 match value {
243 Value::Array(conversations) => Ok(conversations),
244 _ => Err(AdapterError::schema(
245 NAME,
246 path.display().to_string(),
247 format!("`{CONVERSATIONS_ENTRY}` is not a JSON array"),
248 )),
249 }
250}
251
252fn build_session(conv: &Value, session_id: &str) -> Result<Session, AdapterError> {
253 let created_at = rfc3339(conv, "created_at").ok_or_else(|| {
254 AdapterError::schema(
255 NAME,
256 session_id.to_owned(),
257 "conversation missing/invalid `created_at`",
258 )
259 })?;
260 let project = conv
263 .get("account")
264 .and_then(|account| extract_str(account, "uuid"))
265 .filter(|uuid| !uuid.trim().is_empty())
268 .ok_or_else(|| {
269 AdapterError::schema(
270 NAME,
271 session_id.to_owned(),
272 "conversation missing/empty `account.uuid` for the project",
273 )
274 })?;
275
276 let mut options = source_options(NAME, conv);
277 if let Some(source) = options.get_mut("source").and_then(Value::as_object_mut) {
278 for key in ["name", "summary", "updated_at"] {
279 if let Some(value) = conv.get(key) {
280 source.insert(key.to_owned(), value.clone());
281 }
282 }
283 }
284
285 Ok(Session {
286 id: session_id.to_owned(),
287 parent_session_id: None,
288 parent_message_id: None,
289 source_agent: NAME.to_owned(),
290 created_at,
291 project,
292 options,
293 })
294}
295
296fn message_events(
297 session_id: &str,
298 message: &Value,
299 index: usize,
300 default_ts: DateTime<Utc>,
301) -> Vec<IngestEvent> {
302 let message_id = message
306 .get("uuid")
307 .and_then(Value::as_str)
308 .map_or_else(|| format!("{session_id}:{index}"), ToOwned::to_owned);
309 let timestamp = rfc3339(message, "created_at").unwrap_or(default_ts);
310 let blocks = message
311 .get("content")
312 .and_then(Value::as_array)
313 .map(Vec::as_slice)
314 .unwrap_or(&[]);
315 let sender = message.get("sender").and_then(Value::as_str);
316 let all_tool_results = !blocks.is_empty() && blocks.iter().all(is_tool_result);
317
318 let parts: Vec<Part> = blocks
319 .iter()
320 .enumerate()
321 .map(|(ordinal, block)| content_part(session_id, &message_id, ordinal, block))
322 .collect();
323
324 let options = message_options(message);
325 let header = match (sender, all_tool_results) {
326 (Some("human"), true) => Message::Tool {
328 id: message_id.clone(),
329 session_id: session_id.to_owned(),
330 timestamp,
331 options,
332 },
333 (Some("assistant"), _) => Message::Assistant {
334 id: message_id.clone(),
335 session_id: session_id.to_owned(),
336 timestamp,
337 options,
338 },
339 _ => Message::User {
342 id: message_id.clone(),
343 session_id: session_id.to_owned(),
344 timestamp,
345 options,
346 },
347 };
348
349 let mut events = Vec::with_capacity(parts.len() + 1);
350 events.push(IngestEvent::Message(header));
351 events.extend(parts.into_iter().map(IngestEvent::Part));
352 events
353}
354
355fn content_part(session_id: &str, message_id: &str, ordinal: usize, block: &Value) -> Part {
356 let (provenance, kind) = match block.get("type").and_then(Value::as_str) {
357 Some("text") => (
358 Provenance::Conversational,
359 PartKind::Text {
360 text: extract_str(block, "text"),
361 },
362 ),
363 Some("thinking") => (
364 Provenance::Conversational,
365 PartKind::Reasoning {
366 text: extract_str(block, "thinking"),
367 },
368 ),
369 Some("tool_use") => (
370 Provenance::Conversational,
371 PartKind::ToolCall {
372 call_id: extract_str(block, "id"),
373 name: extract_str(block, "name"),
374 params: block.get("input").cloned().unwrap_or(Value::Null),
375 provider_executed: true,
376 },
377 ),
378 Some("tool_result") => (
379 Provenance::Injected,
380 PartKind::ToolResult {
381 call_id: None,
386 name: extract_str(block, "name"),
387 is_failure: block
388 .get("is_error")
389 .and_then(Value::as_bool)
390 .unwrap_or(false),
391 result: block.get("content").cloned().unwrap_or(Value::Null),
392 },
393 ),
394 _ => (
397 Provenance::Conversational,
398 PartKind::Text {
399 text: Some(extract_compact_repr(block)),
400 },
401 ),
402 };
403 Part {
404 session_id: session_id.to_owned(),
405 id: part_id(message_id, ordinal),
406 message_id: message_id.to_owned(),
407 ordinal: part_ordinal(ordinal),
408 provenance,
409 options: empty_options(),
410 kind,
411 }
412}
413
414fn message_options(message: &Value) -> ProviderOptions {
415 let mut options = source_options(NAME, message);
416 if let Some(source) = options.get_mut("source").and_then(Value::as_object_mut) {
417 for key in ["sender", "updated_at", "attachments", "files"] {
418 if let Some(value) = message.get(key) {
419 source.insert(key.to_owned(), value.clone());
420 }
421 }
422 }
423 options
424}
425
426fn is_tool_result(block: &Value) -> bool {
427 block.get("type").and_then(Value::as_str) == Some("tool_result")
428}
429
430fn rfc3339(value: &Value, key: &str) -> Option<DateTime<Utc>> {
431 value
432 .get(key)
433 .and_then(Value::as_str)
434 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
435 .map(|dt| dt.with_timezone(&Utc))
436}
437
438fn serialize_session(
439 session: &crate::sessions::SessionWithMessages,
440 fidelity: RestoreFidelity,
441) -> Result<Vec<RestoredFile>, AdapterError> {
442 let conversation = match fidelity {
447 RestoreFidelity::Native => raw_record(&session.session.options),
448 RestoreFidelity::Foreign => None,
449 };
450 let actual_fidelity = if conversation.is_some() {
451 RestoreFidelity::Native
452 } else {
453 RestoreFidelity::Foreign
454 };
455 let conversation = conversation.unwrap_or_else(|| foreign_conversation(session));
456
457 Ok(vec![RestoredFile::new(
458 PathBuf::from(CONVERSATIONS_ENTRY),
459 serde_json::to_vec(&Value::Array(vec![conversation])).map_err(|error| {
460 AdapterError::schema(
461 NAME,
462 &session.session.id,
463 format!("json encode failed: {error}"),
464 )
465 })?,
466 actual_fidelity,
467 )])
468}
469
470fn foreign_conversation(session: &crate::sessions::SessionWithMessages) -> Value {
473 let chat_messages: Vec<Value> = session
474 .messages
475 .iter()
476 .map(|message| {
477 let sender = match message.message {
478 Message::Assistant { .. } => "assistant",
479 _ => "human",
480 };
481 json!({
482 "uuid": message.message.id(),
483 "sender": sender,
484 "created_at": message.message.timestamp().to_rfc3339(),
485 "content": message.parts.iter().map(content_block).collect::<Vec<_>>(),
486 })
487 })
488 .collect();
489 json!({
490 "uuid": session.session.id,
491 "account": { "uuid": &*session.session.project },
492 "created_at": session.session.created_at.to_rfc3339(),
493 "chat_messages": chat_messages,
494 })
495}
496
497fn content_block(part: &Part) -> Value {
498 match &part.kind {
499 PartKind::Text { text } => json!({ "type": "text", "text": extracted_text(text) }),
500 PartKind::Reasoning { text } => {
501 json!({ "type": "thinking", "thinking": extracted_text(text) })
502 }
503 PartKind::ToolCall {
504 call_id,
505 name,
506 params,
507 ..
508 } => json!({
509 "type": "tool_use",
510 "id": extracted_text(call_id),
511 "name": extracted_text(name),
512 "input": params,
513 }),
514 PartKind::ToolResult {
515 name,
516 is_failure,
517 result,
518 ..
519 } => json!({
520 "type": "tool_result",
521 "name": extracted_text(name),
522 "is_error": is_failure,
523 "content": result,
524 }),
525 other => json!({
526 "type": "text",
527 "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
528 }),
529 }
530}
531
532#[cfg(test)]
533mod tests {
534 #![allow(clippy::expect_used, clippy::unwrap_used)]
539
540 use super::*;
541 use crate::{handlers::ingest_adapter, sessions::Store};
542 use tempfile::TempDir;
543
544 const FIXTURE_DIR: &str = "tests/fixtures/adapter/claude_ai_export";
545 const ACCOUNT: &str = "ffffffff-ffff-ffff-ffff-ffffffffffff";
546 const TOOL_CONV: &str = "33333333-3333-3333-3333-333333333333";
547 const EMPTY_NAME_CONV: &str = "44444444-4444-4444-4444-444444444444";
548 const ZERO_MESSAGE_CONV: &str = "55555555-5555-5555-5555-555555555555";
549
550 #[test]
551 fn probe_default_returns_none_no_autodiscovery() {
552 assert!(
554 ClaudeAiExportFactory
555 .probe_default(&Env::with_home("/tmp"))
556 .is_none()
557 );
558 }
559
560 #[tokio::test(flavor = "multi_thread")]
561 async fn ingests_export_directory_into_canonical_shape() -> anyhow::Result<()> {
562 let temp = TempDir::new()?;
563 let store = Store::open_local(temp.path()).await?;
564 let summary = ingest_adapter(
565 &store,
566 &ClaudeAiExportAdapter::new(FIXTURE_DIR),
567 &crate::adapter::NoopOracle,
568 |_| {},
569 )
570 .await?;
571 assert_eq!(summary.dropped_sessions, 0);
572
573 let ids = store.session_ids().await?;
574 assert_eq!(ids.len(), 4, "the 0-message conversation is skipped");
576 assert!(
577 !ids.iter().any(|id| id == ZERO_MESSAGE_CONV),
578 "0-message conversation must not become a session",
579 );
580 assert!(
581 ids.iter().any(|id| id == EMPTY_NAME_CONV),
582 "an empty-name conversation still ingests (title can't gate it)",
583 );
584
585 for id in &ids {
586 let session = store.get_session(id).await?.expect("round-trips");
587 assert_eq!(session.session.source_agent, NAME);
588 assert_eq!(
589 &*session.session.project, ACCOUNT,
590 "spec.md#model-project-non-empty: project = account.uuid",
591 );
592 }
593
594 let tool = store
595 .get_session(TOOL_CONV)
596 .await?
597 .expect("tool conversation");
598 let mut saw_call = false;
599 let mut saw_reasoning = false;
600 let mut tool_result = None;
601 let mut saw_tool_message = false;
602 for stored in &tool.messages {
603 if matches!(stored.message, Message::Tool { .. }) {
604 saw_tool_message = true;
605 }
606 for part in &stored.parts {
607 match &part.kind {
608 PartKind::ToolCall { name, .. }
609 if name.as_deref().map(String::as_str) == Some("web_search") =>
610 {
611 saw_call = true;
612 }
613 PartKind::Reasoning { .. } => saw_reasoning = true,
614 PartKind::ToolResult { call_id, name, .. } => {
615 tool_result = Some((call_id.as_deref().cloned(), name.as_deref().cloned()));
616 }
617 _ => {}
618 }
619 }
620 }
621 let thinking = store
623 .get_session("22222222-2222-2222-2222-222222222222")
624 .await?
625 .expect("thinking conversation");
626 for stored in &thinking.messages {
627 for part in &stored.parts {
628 if matches!(part.kind, PartKind::Reasoning { .. }) {
629 saw_reasoning = true;
630 }
631 }
632 }
633 assert!(saw_call, "tool_use -> ToolCall named web_search");
634 assert!(saw_reasoning, "thinking -> Reasoning");
635 assert!(
636 saw_tool_message,
637 "a human turn of pure tool_result is a Tool message",
638 );
639 let (call_id, name) = tool_result.expect("tool conversation has a ToolResult");
640 assert_eq!(
641 name.as_deref(),
642 Some("web_search"),
643 "tool_result name comes straight off the block",
644 );
645 assert_eq!(
646 call_id, None,
647 "the export carries no tool_use_id on tool_result, so call_id is honestly None",
648 );
649 Ok(())
650 }
651
652 #[test]
656 fn uuid_less_message_ingests_under_synthetic_id() {
657 let ts = DateTime::parse_from_rfc3339("2026-01-01T00:00:00Z")
658 .unwrap()
659 .with_timezone(&Utc);
660 let message = json!({
661 "sender": "human",
662 "content": [{ "type": "text", "text": "no uuid here" }],
663 });
664 let events = message_events("conv-xyz", &message, 3, ts);
665 assert_eq!(
666 events.len(),
667 2,
668 "uuid-less message still emits its Message + Part, not dropped",
669 );
670 match &events[0] {
671 IngestEvent::Message(message) => {
672 assert_eq!(message.id(), "conv-xyz:3", "deterministic synthetic id");
673 }
674 _ => panic!("first event must be the Message"),
675 }
676 }
677
678 #[tokio::test(flavor = "multi_thread")]
679 async fn ingests_export_zip() -> anyhow::Result<()> {
680 use std::io::Write;
681 let temp = TempDir::new()?;
682 let zip_path = temp.path().join("data-2026-01-15-00-00-00-batch-0000.zip");
683 let conversations = std::fs::read(format!("{FIXTURE_DIR}/conversations.json"))?;
684 {
685 let file = std::fs::File::create(&zip_path)?;
686 let mut zip = zip::ZipWriter::new(file);
687 zip.start_file(
688 "conversations.json",
689 zip::write::SimpleFileOptions::default(),
690 )?;
691 zip.write_all(&conversations)?;
692 zip.finish()?;
693 }
694
695 let store = Store::open_local(temp.path().join("store")).await?;
696 ingest_adapter(
697 &store,
698 &ClaudeAiExportAdapter::new(&zip_path),
699 &crate::adapter::NoopOracle,
700 |_| {},
701 )
702 .await?;
703 assert_eq!(
704 store.session_ids().await?.len(),
705 4,
706 "the same four sessions ingest from the zip",
707 );
708 Ok(())
709 }
710
711 #[tokio::test(flavor = "multi_thread")]
712 async fn native_restore_round_trips_one_conversation() -> anyhow::Result<()> {
713 let temp = TempDir::new()?;
714 let store = Store::open_local(temp.path().join("store")).await?;
715 ingest_adapter(
716 &store,
717 &ClaudeAiExportAdapter::new(FIXTURE_DIR),
718 &crate::adapter::NoopOracle,
719 |_| {},
720 )
721 .await?;
722 let session = store
723 .get_session(TOOL_CONV)
724 .await?
725 .expect("tool conversation");
726
727 let files = ClaudeAiExportFactory.serialize(&session, RestoreFidelity::Native)?;
728 assert_eq!(files.len(), 1);
729 assert_eq!(
730 files[0].relative_path,
731 std::path::Path::new(CONVERSATIONS_ENTRY)
732 );
733 let value: Value = serde_json::from_slice(&files[0].bytes)?;
734 let array = value.as_array().expect("conversations.json is an array");
735 assert_eq!(
736 array.len(),
737 1,
738 "per-session restore is a one-conversation export"
739 );
740 assert_eq!(
741 array[0].get("uuid").and_then(Value::as_str),
742 Some(TOOL_CONV),
743 );
744
745 let restore_dir = temp.path().join("restore");
746 std::fs::create_dir_all(&restore_dir)?;
747 std::fs::write(restore_dir.join(CONVERSATIONS_ENTRY), &files[0].bytes)?;
748 let restore_store = Store::open_local(temp.path().join("restore-store")).await?;
749 ingest_adapter(
750 &restore_store,
751 &ClaudeAiExportAdapter::new(&restore_dir),
752 &crate::adapter::NoopOracle,
753 |_| {},
754 )
755 .await?;
756 let restored = restore_store
757 .get_session(TOOL_CONV)
758 .await?
759 .expect("restored");
760 assert_eq!(
761 restored.messages.len(),
762 session.messages.len(),
763 "native restore replays every message",
764 );
765 Ok(())
766 }
767}