1use std::path::{Path, PathBuf};
27
28use async_stream::stream;
29use chrono::{DateTime, Utc};
30use serde_json::{Value, json};
31
32use crate::{
33 sessions::IngestEvent,
34 wire::{Message, Part, PartKind, Provenance, ProviderOptions, Session},
35};
36
37use super::{
38 Adapter, AdapterError, AdapterFactory, AdapterYield, AdapterYieldStream, DiscoverFuture, Env,
39 RestoreFidelity, RestoredFile, SkipOracle, SkipReason, compact_json, config_path,
40 empty_options,
41 extract::{bound_value, extract_compact_repr, extract_str},
42 extracted_text, part_id, part_ordinal, raw_record, source_options,
43};
44
45const NAME: &str = "claude-ai-export";
46
47const CONVERSATIONS_ENTRY: &str = "conversations.json";
50
51pub struct ClaudeAiExportFactory;
55
56impl AdapterFactory for ClaudeAiExportFactory {
57 fn name(&self) -> &'static str {
58 NAME
59 }
60
61 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
62 Ok(Box::new(ClaudeAiExportAdapter::new(config_path(
63 NAME, config,
64 )?)))
65 }
66
67 fn probe_default(&self, _env: &Env) -> Option<Value> {
68 None
71 }
72
73 fn serialize(
74 &self,
75 session: &crate::sessions::SessionWithMessages,
76 fidelity: RestoreFidelity,
77 ) -> Result<Vec<RestoredFile>, AdapterError> {
78 serialize_session(session, fidelity)
79 }
80}
81
82#[derive(Debug, Clone)]
85pub struct ClaudeAiExportAdapter {
86 path: PathBuf,
87}
88
89impl ClaudeAiExportAdapter {
90 pub fn new(path: impl Into<PathBuf>) -> Self {
91 Self { path: path.into() }
92 }
93}
94
95impl Adapter for ClaudeAiExportAdapter {
96 fn discover(&self) -> DiscoverFuture<'_> {
97 let path = self.path.clone();
98 Box::pin(async move {
99 tokio::task::spawn_blocking(move || {
100 read_conversations(&path).map(|conversations| {
101 conversations
102 .iter()
103 .filter(|conv| {
106 conv.get("uuid").and_then(Value::as_str).is_some()
107 && !messages_of(conv).is_empty()
108 })
109 .count()
110 })
111 })
112 .await
113 .map_err(join_error)?
114 })
115 }
116
117 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
118 let path = self.path.clone();
119 Box::pin(stream! {
120 let parsed = tokio::task::spawn_blocking(move || read_conversations(&path)).await;
124 let conversations = match parsed {
125 Ok(Ok(conversations)) => conversations,
126 Ok(Err(error)) => { yield Err(error); return; }
127 Err(join) => { yield Err(join_error(join)); return; }
128 };
129
130 for mut conv in conversations {
131 bound_value(&mut conv);
132 let Some(session_id) = conv.get("uuid").and_then(Value::as_str).map(ToOwned::to_owned)
133 else {
134 yield Err(AdapterError::schema(
135 NAME,
136 CONVERSATIONS_ENTRY,
137 "conversation missing `uuid`",
138 ));
139 continue;
140 };
141 if messages_of(&conv).is_empty() {
142 yield Ok(AdapterYield::Skipped {
144 session_id: Some(session_id),
145 project: conv
146 .get("account")
147 .and_then(|account| account.get("uuid"))
148 .and_then(Value::as_str)
149 .map(ToOwned::to_owned),
150 reason: SkipReason::Empty,
151 });
152 continue;
153 }
154 if let Some(ingested) = oracle.last_ingested_at(&session_id)
157 && let Some(updated) = rfc3339(&conv, "updated_at")
158 && updated <= ingested
159 {
160 yield Ok(AdapterYield::Skipped {
161 session_id: Some(session_id),
162 project: None,
163 reason: SkipReason::Fresh,
164 });
165 continue;
166 }
167
168 let session = match build_session(&conv, &session_id) {
169 Ok(session) => session,
170 Err(error) => { yield Err(error); continue; }
171 };
172 let created_at = session.created_at;
173 yield Ok(AdapterYield::Event(IngestEvent::Session(session)));
174
175 for (index, message) in messages_of(&conv).iter().enumerate() {
176 for event in message_events(&session_id, message, index, created_at) {
177 yield Ok(AdapterYield::Event(event));
178 }
179 }
180 }
181 })
182 }
183}
184
185fn join_error(join: tokio::task::JoinError) -> AdapterError {
187 AdapterError::io(
188 NAME,
189 "blocking read task",
190 std::io::Error::other(join.to_string()),
191 )
192}
193
194fn messages_of(conv: &Value) -> &[Value] {
195 conv.get("chat_messages")
196 .and_then(Value::as_array)
197 .map(Vec::as_slice)
198 .unwrap_or(&[])
199}
200
201fn read_conversations(path: &Path) -> Result<Vec<Value>, AdapterError> {
204 use std::io::Read;
205 let io = |location: String, source| AdapterError::io(NAME, location, source);
206
207 let bytes = if path.is_dir() {
208 let file = path.join(CONVERSATIONS_ENTRY);
209 std::fs::read(&file).map_err(|error| io(file.display().to_string(), error))?
210 } else if path.extension().and_then(|ext| ext.to_str()) == Some("zip") {
211 let file =
212 std::fs::File::open(path).map_err(|error| io(path.display().to_string(), error))?;
213 let mut archive = zip::ZipArchive::new(file).map_err(|error| {
214 AdapterError::schema(
215 NAME,
216 path.display().to_string(),
217 format!("bad zip: {error}"),
218 )
219 })?;
220 let mut entry = archive.by_name(CONVERSATIONS_ENTRY).map_err(|error| {
221 AdapterError::schema(
222 NAME,
223 path.display().to_string(),
224 format!("export zip has no `{CONVERSATIONS_ENTRY}`: {error}"),
225 )
226 })?;
227 let hint = entry.size().min(64 * 1024 * 1024) as usize;
231 let mut buf = Vec::with_capacity(hint);
232 entry
233 .read_to_end(&mut buf)
234 .map_err(|error| io(path.display().to_string(), error))?;
235 buf
236 } else {
237 std::fs::read(path).map_err(|error| io(path.display().to_string(), error))?
238 };
239
240 let value: Value = serde_json::from_slice(&bytes)
241 .map_err(|error| AdapterError::parse(NAME, path.display().to_string(), 1, error))?;
242 match value {
243 Value::Array(conversations) => Ok(conversations),
244 _ => Err(AdapterError::schema(
245 NAME,
246 path.display().to_string(),
247 format!("`{CONVERSATIONS_ENTRY}` is not a JSON array"),
248 )),
249 }
250}
251
252fn build_session(conv: &Value, session_id: &str) -> Result<Session, AdapterError> {
253 let created_at = rfc3339(conv, "created_at").ok_or_else(|| {
254 AdapterError::schema(
255 NAME,
256 session_id.to_owned(),
257 "conversation missing/invalid `created_at`",
258 )
259 })?;
260 let project = conv
263 .get("account")
264 .and_then(|account| extract_str(account, "uuid"))
265 .filter(|uuid| !uuid.trim().is_empty())
268 .ok_or_else(|| {
269 AdapterError::schema(
270 NAME,
271 session_id.to_owned(),
272 "conversation missing/empty `account.uuid` for the project",
273 )
274 })?;
275
276 let mut options = source_options(NAME, conv);
277 if let Some(source) = options.get_mut("source").and_then(Value::as_object_mut) {
278 for key in ["name", "summary", "updated_at"] {
279 if let Some(value) = conv.get(key) {
280 source.insert(key.to_owned(), value.clone());
281 }
282 }
283 }
284
285 Ok(Session {
286 id: session_id.to_owned(),
287 parent_session_id: None,
288 parent_message_id: None,
289 source_agent: NAME.to_owned(),
290 created_at,
291 project,
292 options,
293 })
294}
295
296fn message_events(
297 session_id: &str,
298 message: &Value,
299 index: usize,
300 default_ts: DateTime<Utc>,
301) -> Vec<IngestEvent> {
302 let message_id = message
306 .get("uuid")
307 .and_then(Value::as_str)
308 .map_or_else(|| format!("{session_id}:{index}"), ToOwned::to_owned);
309 let timestamp = rfc3339(message, "created_at").unwrap_or(default_ts);
310 let blocks = message
311 .get("content")
312 .and_then(Value::as_array)
313 .map(Vec::as_slice)
314 .unwrap_or(&[]);
315 let sender = message.get("sender").and_then(Value::as_str);
316 let all_tool_results = !blocks.is_empty() && blocks.iter().all(is_tool_result);
317
318 let parts: Vec<Part> = blocks
319 .iter()
320 .enumerate()
321 .map(|(ordinal, block)| content_part(session_id, &message_id, ordinal, block))
322 .collect();
323
324 let options = message_options(message);
325 let header = match (sender, all_tool_results) {
326 (Some("human"), true) => Message::Tool {
328 id: message_id.clone(),
329 session_id: session_id.to_owned(),
330 timestamp,
331 options,
332 },
333 (Some("assistant"), _) => Message::Assistant {
334 id: message_id.clone(),
335 session_id: session_id.to_owned(),
336 timestamp,
337 options,
338 },
339 _ => Message::User {
342 id: message_id.clone(),
343 session_id: session_id.to_owned(),
344 timestamp,
345 options,
346 },
347 };
348
349 let mut events = Vec::with_capacity(parts.len() + 1);
350 events.push(IngestEvent::Message(header));
351 events.extend(parts.into_iter().map(IngestEvent::Part));
352 events
353}
354
355fn content_part(session_id: &str, message_id: &str, ordinal: usize, block: &Value) -> Part {
356 let (provenance, kind) = match block.get("type").and_then(Value::as_str) {
357 Some("text") => (
358 Provenance::Conversational,
359 PartKind::Text {
360 text: extract_str(block, "text"),
361 },
362 ),
363 Some("thinking") => (
364 Provenance::Conversational,
365 PartKind::Reasoning {
366 text: extract_str(block, "thinking"),
367 },
368 ),
369 Some("tool_use") => (
370 Provenance::Conversational,
371 PartKind::ToolCall {
372 call_id: extract_str(block, "id"),
373 name: extract_str(block, "name"),
374 params: block.get("input").cloned().unwrap_or(Value::Null),
375 provider_executed: true,
376 },
377 ),
378 Some("tool_result") => (
379 Provenance::Injected,
380 PartKind::ToolResult {
381 call_id: None,
386 name: extract_str(block, "name"),
387 is_failure: block
388 .get("is_error")
389 .and_then(Value::as_bool)
390 .unwrap_or(false),
391 result: block.get("content").cloned().unwrap_or(Value::Null),
392 },
393 ),
394 _ => (
397 Provenance::Conversational,
398 PartKind::Text {
399 text: Some(extract_compact_repr(block)),
400 },
401 ),
402 };
403 Part {
404 session_id: session_id.to_owned(),
405 id: part_id(message_id, ordinal),
406 message_id: message_id.to_owned(),
407 ordinal: part_ordinal(ordinal),
408 provenance,
409 options: empty_options(),
410 kind,
411 }
412}
413
414fn message_options(message: &Value) -> ProviderOptions {
415 let mut options = source_options(NAME, message);
416 if let Some(source) = options.get_mut("source").and_then(Value::as_object_mut) {
417 for key in ["sender", "updated_at", "attachments", "files"] {
418 if let Some(value) = message.get(key) {
419 source.insert(key.to_owned(), value.clone());
420 }
421 }
422 }
423 options
424}
425
426fn is_tool_result(block: &Value) -> bool {
427 block.get("type").and_then(Value::as_str) == Some("tool_result")
428}
429
430fn rfc3339(value: &Value, key: &str) -> Option<DateTime<Utc>> {
431 value
432 .get(key)
433 .and_then(Value::as_str)
434 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
435 .map(|dt| dt.with_timezone(&Utc))
436}
437
438fn serialize_session(
439 session: &crate::sessions::SessionWithMessages,
440 fidelity: RestoreFidelity,
441) -> Result<Vec<RestoredFile>, AdapterError> {
442 let conversation = match fidelity {
447 RestoreFidelity::Native => raw_record(&session.session.options),
448 RestoreFidelity::Foreign => None,
449 };
450 let actual_fidelity = if conversation.is_some() {
451 RestoreFidelity::Native
452 } else {
453 RestoreFidelity::Foreign
454 };
455 let conversation = conversation.unwrap_or_else(|| foreign_conversation(session));
456
457 Ok(vec![RestoredFile::new(
458 PathBuf::from(CONVERSATIONS_ENTRY),
459 serde_json::to_vec(&Value::Array(vec![conversation])).map_err(|error| {
460 AdapterError::schema(
461 NAME,
462 &session.session.id,
463 format!("json encode failed: {error}"),
464 )
465 })?,
466 actual_fidelity,
467 )])
468}
469
470fn foreign_conversation(session: &crate::sessions::SessionWithMessages) -> Value {
473 let chat_messages: Vec<Value> = session
474 .messages
475 .iter()
476 .map(|message| {
477 let sender = match message.message {
478 Message::Assistant { .. } => "assistant",
479 _ => "human",
480 };
481 json!({
482 "uuid": message.message.id(),
483 "sender": sender,
484 "created_at": message.message.timestamp().to_rfc3339(),
485 "content": message.parts.iter().map(content_block).collect::<Vec<_>>(),
486 })
487 })
488 .collect();
489 json!({
490 "uuid": session.session.id,
491 "account": { "uuid": &*session.session.project },
492 "created_at": session.session.created_at.to_rfc3339(),
493 "chat_messages": chat_messages,
494 })
495}
496
497fn content_block(part: &Part) -> Value {
498 match &part.kind {
499 PartKind::Text { text } => json!({ "type": "text", "text": extracted_text(text) }),
500 PartKind::Reasoning { text } => {
501 json!({ "type": "thinking", "thinking": extracted_text(text) })
502 }
503 PartKind::ToolCall {
504 call_id,
505 name,
506 params,
507 ..
508 } => json!({
509 "type": "tool_use",
510 "id": extracted_text(call_id),
511 "name": extracted_text(name),
512 "input": params,
513 }),
514 PartKind::ToolResult {
515 name,
516 is_failure,
517 result,
518 ..
519 } => json!({
520 "type": "tool_result",
521 "name": extracted_text(name),
522 "is_error": is_failure,
523 "content": result,
524 }),
525 other => json!({
526 "type": "text",
527 "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
528 }),
529 }
530}
531
532#[cfg(test)]
533mod tests {
534 #![allow(clippy::expect_used, clippy::unwrap_used)]
539
540 use super::*;
541 use crate::{handlers::ingest_adapter, sessions::Store};
542 use tempfile::TempDir;
543
544 const FIXTURE_DIR: &str = concat!(
547 env!("CARGO_MANIFEST_DIR"),
548 "/tests/fixtures/adapter/claude_ai_export"
549 );
550 const ACCOUNT: &str = "ffffffff-ffff-ffff-ffff-ffffffffffff";
551 const TOOL_CONV: &str = "33333333-3333-3333-3333-333333333333";
552 const EMPTY_NAME_CONV: &str = "44444444-4444-4444-4444-444444444444";
553 const ZERO_MESSAGE_CONV: &str = "55555555-5555-5555-5555-555555555555";
554
555 #[test]
556 fn probe_default_returns_none_no_autodiscovery() {
557 assert!(
559 ClaudeAiExportFactory
560 .probe_default(&Env::with_home("/tmp"))
561 .is_none()
562 );
563 }
564
565 #[tokio::test(flavor = "multi_thread")]
566 async fn ingests_export_directory_into_canonical_shape() -> anyhow::Result<()> {
567 let temp = TempDir::new()?;
568 let store = Store::open_local(temp.path()).await?;
569 let summary = ingest_adapter(
570 &store,
571 &ClaudeAiExportAdapter::new(FIXTURE_DIR),
572 &crate::adapter::NoopOracle,
573 |_| {},
574 )
575 .await?;
576 assert_eq!(summary.dropped_sessions, 0);
577
578 let ids = store.session_ids().await?;
579 assert_eq!(ids.len(), 4, "the 0-message conversation is skipped");
581 assert!(
582 !ids.iter().any(|id| id == ZERO_MESSAGE_CONV),
583 "0-message conversation must not become a session",
584 );
585 assert!(
586 ids.iter().any(|id| id == EMPTY_NAME_CONV),
587 "an empty-name conversation still ingests (title can't gate it)",
588 );
589
590 for id in &ids {
591 let session = store.get_session(id).await?.expect("round-trips");
592 assert_eq!(session.session.source_agent, NAME);
593 assert_eq!(
594 &*session.session.project, ACCOUNT,
595 "spec.md#model-project-non-empty: project = account.uuid",
596 );
597 }
598
599 let tool = store
600 .get_session(TOOL_CONV)
601 .await?
602 .expect("tool conversation");
603 let mut saw_call = false;
604 let mut saw_reasoning = false;
605 let mut tool_result = None;
606 let mut saw_tool_message = false;
607 for stored in &tool.messages {
608 if matches!(stored.message, Message::Tool { .. }) {
609 saw_tool_message = true;
610 }
611 for part in &stored.parts {
612 match &part.kind {
613 PartKind::ToolCall { name, .. }
614 if name.as_deref().map(String::as_str) == Some("web_search") =>
615 {
616 saw_call = true;
617 }
618 PartKind::Reasoning { .. } => saw_reasoning = true,
619 PartKind::ToolResult { call_id, name, .. } => {
620 tool_result = Some((call_id.as_deref().cloned(), name.as_deref().cloned()));
621 }
622 _ => {}
623 }
624 }
625 }
626 let thinking = store
628 .get_session("22222222-2222-2222-2222-222222222222")
629 .await?
630 .expect("thinking conversation");
631 for stored in &thinking.messages {
632 for part in &stored.parts {
633 if matches!(part.kind, PartKind::Reasoning { .. }) {
634 saw_reasoning = true;
635 }
636 }
637 }
638 assert!(saw_call, "tool_use -> ToolCall named web_search");
639 assert!(saw_reasoning, "thinking -> Reasoning");
640 assert!(
641 saw_tool_message,
642 "a human turn of pure tool_result is a Tool message",
643 );
644 let (call_id, name) = tool_result.expect("tool conversation has a ToolResult");
645 assert_eq!(
646 name.as_deref(),
647 Some("web_search"),
648 "tool_result name comes straight off the block",
649 );
650 assert_eq!(
651 call_id, None,
652 "the export carries no tool_use_id on tool_result, so call_id is honestly None",
653 );
654 Ok(())
655 }
656
657 #[test]
661 fn uuid_less_message_ingests_under_synthetic_id() {
662 let ts = DateTime::parse_from_rfc3339("2026-01-01T00:00:00Z")
663 .unwrap()
664 .with_timezone(&Utc);
665 let message = json!({
666 "sender": "human",
667 "content": [{ "type": "text", "text": "no uuid here" }],
668 });
669 let events = message_events("conv-xyz", &message, 3, ts);
670 assert_eq!(
671 events.len(),
672 2,
673 "uuid-less message still emits its Message + Part, not dropped",
674 );
675 match &events[0] {
676 IngestEvent::Message(message) => {
677 assert_eq!(message.id(), "conv-xyz:3", "deterministic synthetic id");
678 }
679 _ => panic!("first event must be the Message"),
680 }
681 }
682
683 #[tokio::test(flavor = "multi_thread")]
684 async fn ingests_export_zip() -> anyhow::Result<()> {
685 use std::io::Write;
686 let temp = TempDir::new()?;
687 let zip_path = temp.path().join("data-2026-01-15-00-00-00-batch-0000.zip");
688 let conversations = std::fs::read(format!("{FIXTURE_DIR}/conversations.json"))?;
689 {
690 let file = std::fs::File::create(&zip_path)?;
691 let mut zip = zip::ZipWriter::new(file);
692 zip.start_file(
693 "conversations.json",
694 zip::write::SimpleFileOptions::default(),
695 )?;
696 zip.write_all(&conversations)?;
697 zip.finish()?;
698 }
699
700 let store = Store::open_local(temp.path().join("store")).await?;
701 ingest_adapter(
702 &store,
703 &ClaudeAiExportAdapter::new(&zip_path),
704 &crate::adapter::NoopOracle,
705 |_| {},
706 )
707 .await?;
708 assert_eq!(
709 store.session_ids().await?.len(),
710 4,
711 "the same four sessions ingest from the zip",
712 );
713 Ok(())
714 }
715
716 #[tokio::test(flavor = "multi_thread")]
717 async fn native_restore_round_trips_one_conversation() -> anyhow::Result<()> {
718 let temp = TempDir::new()?;
719 let store = Store::open_local(temp.path().join("store")).await?;
720 ingest_adapter(
721 &store,
722 &ClaudeAiExportAdapter::new(FIXTURE_DIR),
723 &crate::adapter::NoopOracle,
724 |_| {},
725 )
726 .await?;
727 let session = store
728 .get_session(TOOL_CONV)
729 .await?
730 .expect("tool conversation");
731
732 let files = ClaudeAiExportFactory.serialize(&session, RestoreFidelity::Native)?;
733 assert_eq!(files.len(), 1);
734 assert_eq!(
735 files[0].relative_path,
736 std::path::Path::new(CONVERSATIONS_ENTRY)
737 );
738 let value: Value = serde_json::from_slice(&files[0].bytes)?;
739 let array = value.as_array().expect("conversations.json is an array");
740 assert_eq!(
741 array.len(),
742 1,
743 "per-session restore is a one-conversation export"
744 );
745 assert_eq!(
746 array[0].get("uuid").and_then(Value::as_str),
747 Some(TOOL_CONV),
748 );
749
750 let restore_dir = temp.path().join("restore");
751 std::fs::create_dir_all(&restore_dir)?;
752 std::fs::write(restore_dir.join(CONVERSATIONS_ENTRY), &files[0].bytes)?;
753 let restore_store = Store::open_local(temp.path().join("restore-store")).await?;
754 ingest_adapter(
755 &restore_store,
756 &ClaudeAiExportAdapter::new(&restore_dir),
757 &crate::adapter::NoopOracle,
758 |_| {},
759 )
760 .await?;
761 let restored = restore_store
762 .get_session(TOOL_CONV)
763 .await?
764 .expect("restored");
765 assert_eq!(
766 restored.messages.len(),
767 session.messages.len(),
768 "native restore replays every message",
769 );
770 Ok(())
771 }
772}