1use std::{
33 collections::HashMap,
34 path::{Path, PathBuf},
35};
36
37use async_stream::stream;
38use chrono::{DateTime, SecondsFormat, Utc};
39use serde_json::{Value, json};
40use tokio::sync::mpsc;
41use walkdir::WalkDir;
42
43use crate::{
44 sessions::IngestEvent,
45 wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
46};
47
48use super::{
49 Adapter, AdapterError, AdapterFactory, AdapterYield, AdapterYieldStream, DiscoverFuture, Env,
50 RestoreFidelity, RestoredFile, SkipOracle, SkipReason, by_timestamp_then_id, compact_json,
51 config_path, empty_options,
52 extract::{
53 Extracted, Source, bound_value, extract_compact_repr, extract_self_str, extract_str,
54 },
55 extracted_text,
56 jsonl::{RECORD_CAP, peek_last_line},
57 jsonl_bytes, part_id, part_ordinal, raw_record, source_options,
58};
59
60const NAME: &str = "claude-desktop-app";
61
62const CHANNEL_CAP: usize = 256;
65
66const SESSIONS_SUBDIR: &str = "local-agent-mode-sessions";
68
69pub struct ClaudeDesktopAppFactory;
72
73impl AdapterFactory for ClaudeDesktopAppFactory {
74 fn name(&self) -> &'static str {
75 NAME
76 }
77
78 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
79 Ok(Box::new(ClaudeDesktopAppAdapter::new(config_path(
80 NAME, config,
81 )?)))
82 }
83
84 fn probe_default(&self, env: &Env) -> Option<Value> {
85 let path = cowork_root(&env.home);
86 path.exists().then(|| json!({ "path": path }))
87 }
88
89 fn serialize(
90 &self,
91 session: &crate::sessions::SessionWithMessages,
92 fidelity: RestoreFidelity,
93 ) -> Result<Vec<RestoredFile>, AdapterError> {
94 serialize_session(session, fidelity)
95 }
96}
97
98fn cowork_root(home: &Path) -> PathBuf {
100 home.join("Library")
101 .join("Application Support")
102 .join("Claude")
103 .join(SESSIONS_SUBDIR)
104}
105
106#[derive(Debug, Clone)]
108pub struct ClaudeDesktopAppAdapter {
109 root: PathBuf,
110}
111
112impl ClaudeDesktopAppAdapter {
113 pub fn new(root: impl Into<PathBuf>) -> Self {
114 Self { root: root.into() }
115 }
116}
117
118impl Adapter for ClaudeDesktopAppAdapter {
119 fn discover(&self) -> DiscoverFuture<'_> {
120 let root = self.root.clone();
121 Box::pin(async move {
122 tokio::task::spawn_blocking(move || collect_sessions(&root).map(|files| files.len()))
123 .await
124 .map_err(join_error)?
125 })
126 }
127
128 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
129 let adapter = self.clone();
130 Box::pin(stream! {
131 let files = {
132 let root = adapter.root.clone();
133 tokio::task::spawn_blocking(move || collect_sessions(&root)).await
134 };
135 let files = match files {
136 Ok(Ok(files)) => files,
137 Ok(Err(error)) => { yield Err(error); return; }
138 Err(join) => { yield Err(join_error(join)); return; }
139 };
140
141 let mut survivors = Vec::with_capacity(files.len());
145 for file in files {
146 if !oracle.is_empty() {
147 let audit = file.audit_path.clone();
148 let last_ts =
149 match tokio::task::spawn_blocking(move || source_last_ts(&audit)).await {
150 Ok(last_ts) => last_ts,
151 Err(join) => { yield Err(join_error(join)); return; }
152 };
153 if crate::adapter::is_session_fresh(oracle, &file.session_id, last_ts) {
154 yield Ok(AdapterYield::Skipped {
155 session_id: Some(file.session_id.clone()),
156 project: None,
157 reason: SkipReason::Fresh,
158 });
159 continue;
160 }
161 }
162 survivors.push(file);
163 }
164
165 let (tx, mut rx) = mpsc::channel(CHANNEL_CAP);
166 let handle = tokio::task::spawn_blocking(move || read_sessions(survivors, &tx));
167 while let Some(item) = rx.recv().await {
168 yield item;
169 }
170 if let Err(join) = handle.await {
171 yield Err(join_error(join));
172 }
173 })
174 }
175}
176
177fn source_last_ts(audit_path: &Path) -> Option<i64> {
186 let last_line = peek_last_line(audit_path)?;
187 let record: Value = serde_json::from_str(&last_line).ok()?;
188 Some(record_timestamp(&record)?.timestamp_micros())
189}
190
191fn join_error(join: tokio::task::JoinError) -> AdapterError {
192 AdapterError::io(
193 NAME,
194 "blocking read task",
195 std::io::Error::other(join.to_string()),
196 )
197}
198
199struct CoworkSession {
203 session_id: String,
204 audit_path: PathBuf,
205 meta_path: PathBuf,
206 relative_dir: PathBuf,
207}
208
209fn collect_sessions(root: &Path) -> Result<Vec<CoworkSession>, AdapterError> {
213 if !root.exists() {
214 return Ok(Vec::new());
215 }
216 let io = |source| AdapterError::io(NAME, root.display().to_string(), source);
217 let mut out = Vec::new();
218 let walker = WalkDir::new(root).into_iter().filter_entry(|entry| {
219 !(entry.file_type().is_dir()
223 && entry
224 .file_name()
225 .to_str()
226 .is_some_and(|name| name.starts_with('.')))
227 });
228 for entry in walker {
229 let entry = entry.map_err(|error| io(error.into()))?;
230 if entry.file_name() != "audit.jsonl" {
231 continue;
232 }
233 let audit_path = entry.into_path();
234 let Some(dir) = audit_path.parent() else {
235 continue;
236 };
237 let Some(dir_name) = dir.file_name().and_then(|name| name.to_str()) else {
238 continue;
239 };
240 if !dir_name.starts_with("local_") {
244 continue;
245 }
246 let Some(workspace) = dir.parent() else {
247 continue;
248 };
249 let meta_path = workspace.join(format!("{dir_name}.json"));
250 let relative_dir = dir.strip_prefix(root).unwrap_or(dir).to_path_buf();
251 out.push(CoworkSession {
252 session_id: dir_name.to_owned(),
253 audit_path,
254 meta_path,
255 relative_dir,
256 });
257 }
258 out.sort_by(|a, b| a.audit_path.cmp(&b.audit_path));
259 Ok(out)
260}
261
262fn read_sessions(
263 sessions: Vec<CoworkSession>,
264 tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
265) {
266 for session in sessions {
267 if !read_one_session(session, tx) {
268 return;
269 }
270 }
271}
272
273fn read_one_session(
275 file: CoworkSession,
276 tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
277) -> bool {
278 macro_rules! emit {
279 ($item:expr) => {
280 if tx.blocking_send($item).is_err() {
281 return false;
282 }
283 };
284 }
285
286 let meta = match read_json(&file.meta_path) {
287 Ok(value) => value,
288 Err(error) => {
289 emit!(Err(error));
290 return true;
291 }
292 };
293 let session = match build_session(&file, &meta) {
294 Ok(session) => session,
295 Err(error) => {
296 emit!(Err(error));
297 return true;
298 }
299 };
300 let created_at = session.created_at;
301 let session_id = session.id.clone();
302 emit!(Ok(AdapterYield::Event(IngestEvent::Session(session))));
303
304 let bytes = match std::fs::read(&file.audit_path) {
305 Ok(bytes) => bytes,
306 Err(error) => {
307 emit!(Err(AdapterError::io(
308 NAME,
309 file.audit_path.display().to_string(),
310 error
311 )));
312 return true;
313 }
314 };
315 let text = match std::str::from_utf8(&bytes) {
316 Ok(text) => text,
317 Err(_) => {
318 emit!(Err(AdapterError::schema(
319 NAME,
320 file.audit_path.display().to_string(),
321 "audit.jsonl is not valid UTF-8",
322 )));
323 return true;
324 }
325 };
326
327 let mut tool_call_names: HashMap<String, Extracted<String>> = HashMap::new();
328 for (index, line) in text.lines().enumerate() {
329 let line_no = index + 1;
330 if line.trim().is_empty() {
331 continue;
332 }
333 if line.len() > RECORD_CAP {
334 emit!(Err(AdapterError::schema(
335 NAME,
336 format!("{}:{line_no}", file.audit_path.display()),
337 format!(
338 "audit line exceeds adapter record cap: {} bytes > {RECORD_CAP}",
339 line.len()
340 ),
341 )));
342 continue;
343 }
344 let mut record: Value = match serde_json::from_str(line) {
345 Ok(value) => value,
346 Err(error) => {
347 emit!(Err(AdapterError::parse(
348 NAME,
349 file.audit_path.display().to_string(),
350 line_no,
351 error,
352 )));
353 continue;
354 }
355 };
356 bound_value(&mut record);
357 capture_tool_call_names(&record, &mut tool_call_names);
358 match record_events(&session_id, line_no, &record, created_at, &tool_call_names) {
359 Ok(events) => {
360 for event in events {
361 emit!(Ok(AdapterYield::Event(event)));
362 }
363 }
364 Err(message) => emit!(Err(AdapterError::schema(
365 NAME,
366 format!("{}:{line_no}", file.audit_path.display()),
367 message,
368 ))),
369 }
370 }
371 true
372}
373
374fn read_json(path: &Path) -> Result<Value, AdapterError> {
377 use std::io::Read;
378 let io = |source| AdapterError::io(NAME, path.display().to_string(), source);
379 let mut file = std::fs::File::open(path).map_err(io)?;
380 let len = file.metadata().map_err(io)?.len();
381 if len > RECORD_CAP as u64 {
382 return Err(AdapterError::schema(
383 NAME,
384 path.display().to_string(),
385 format!("json file exceeds adapter record cap: {len} bytes > {RECORD_CAP}"),
386 ));
387 }
388 let mut bytes = Vec::with_capacity(len as usize);
389 file.read_to_end(&mut bytes).map_err(io)?;
390 let mut value: Value = serde_json::from_slice(&bytes)
391 .map_err(|error| AdapterError::parse(NAME, path.display().to_string(), 1, error))?;
392 bound_value(&mut value);
393 Ok(value)
394}
395
396fn build_session(file: &CoworkSession, meta: &Value) -> Result<Session, AdapterError> {
397 let display = file.meta_path.display().to_string();
398 let session_id = file.session_id.clone();
404
405 let created_at = meta
406 .get("createdAt")
407 .and_then(Value::as_i64)
408 .and_then(DateTime::from_timestamp_millis)
409 .ok_or_else(|| {
410 AdapterError::schema(
411 NAME,
412 display.clone(),
413 "metadata missing numeric `createdAt`",
414 )
415 })?;
416
417 let project = meta
421 .get("userSelectedFolders")
422 .and_then(Value::as_array)
423 .and_then(|folders| folders.first())
424 .filter(|first| first.as_str().is_some_and(|s| !s.is_empty()))
425 .and_then(|first| extract_self_str(first))
426 .or_else(|| extract_str(meta, "cwd").filter(|cwd| !cwd.trim().is_empty()))
427 .ok_or_else(|| {
428 AdapterError::schema(
429 NAME,
430 display,
431 "metadata has neither `userSelectedFolders[0]` nor `cwd` for the project",
432 )
433 })?;
434
435 let mut options = source_options(NAME, meta);
436 if let Some(source) = options.get_mut("source").and_then(Value::as_object_mut) {
437 source.insert(
438 "relative_dir".to_owned(),
439 json!(file.relative_dir.to_string_lossy()),
440 );
441 for key in [
442 "model",
443 "title",
444 "cliSessionId",
445 "systemPrompt",
446 "initialMessage",
447 "enabledMcpTools",
448 "vmProcessName",
449 "accountName",
450 ] {
451 if let Some(value) = meta.get(key) {
452 source.insert(key.to_owned(), value.clone());
453 }
454 }
455 }
456
457 Ok(Session {
458 id: session_id,
459 parent_session_id: None,
460 parent_message_id: None,
461 source_agent: NAME.to_owned(),
462 created_at,
463 project,
464 options,
465 })
466}
467
468fn capture_tool_call_names(record: &Value, map: &mut HashMap<String, Extracted<String>>) {
472 let Some(items) = record
473 .get("message")
474 .and_then(|message| message.get("content"))
475 .and_then(Value::as_array)
476 else {
477 return;
478 };
479 for item in items {
480 if !matches!(
481 item.get("type").and_then(Value::as_str),
482 Some("tool_use") | Some("server_tool_use")
483 ) {
484 continue;
485 }
486 let (Some(id), Some(name)) = (item.str_field("id"), extract_str(item, "name")) else {
487 continue;
488 };
489 map.insert(id.to_owned(), name);
490 }
491}
492
493fn record_events(
497 session_id: &str,
498 line: usize,
499 record: &Value,
500 default_timestamp: DateTime<Utc>,
501 tool_call_names: &HashMap<String, Extracted<String>>,
502) -> Result<Vec<IngestEvent>, String> {
503 let timestamp = record_timestamp(record).unwrap_or(default_timestamp);
504 let uuid = record
505 .get("uuid")
506 .and_then(Value::as_str)
507 .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
508 let rtype = record.get("type").and_then(Value::as_str);
509
510 match rtype {
511 Some("user") | Some("assistant") => {
512 let message_value = record.get("message").unwrap_or(&Value::Null);
513 message_events(
514 session_id,
515 &uuid,
516 timestamp,
517 record,
518 message_value,
519 tool_call_names,
520 line,
521 )
522 }
523 _ => {
527 let content = extract_str(record, "subtype").or_else(|| extract_str(record, "type"));
528 Ok(vec![IngestEvent::Message(Message::System {
529 id: uuid,
530 session_id: session_id.to_owned(),
531 timestamp,
532 content,
533 options: row_options(record, line),
534 })])
535 }
536 }
537}
538
539fn record_timestamp(record: &Value) -> Option<DateTime<Utc>> {
540 record
541 .get("_audit_timestamp")
542 .or_else(|| record.get("timestamp"))
543 .and_then(Value::as_str)
544 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
545 .map(|dt| dt.with_timezone(&Utc))
546}
547
548fn message_events(
549 session_id: &str,
550 uuid: &str,
551 timestamp: DateTime<Utc>,
552 record: &Value,
553 message_value: &Value,
554 tool_call_names: &HashMap<String, Extracted<String>>,
555 line: usize,
556) -> Result<Vec<IngestEvent>, String> {
557 let role = message_value
558 .get("role")
559 .and_then(Value::as_str)
560 .ok_or_else(|| "message missing role".to_owned())?;
561 let content = message_value.get("content").unwrap_or(&Value::Null);
562 let mut parts = Vec::new();
563 let message = match (role, content) {
564 ("user", Value::String(_)) => {
565 parts.push(text_part(
566 session_id,
567 uuid,
568 0,
569 extract_self_str(content),
570 Provenance::Conversational,
571 ));
572 Message::User {
573 id: uuid.to_owned(),
574 session_id: session_id.to_owned(),
575 timestamp,
576 options: row_options(record, line),
577 }
578 }
579 ("user", Value::Array(items)) if !items.is_empty() && items.iter().all(is_tool_result) => {
580 let source_tool_result = record.get("tool_use_result").cloned();
581 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
582 tool_result_part(
583 session_id,
584 uuid,
585 ordinal,
586 item,
587 source_tool_result.as_ref(),
588 tool_call_names,
589 )
590 }));
591 Message::Tool {
592 id: uuid.to_owned(),
593 session_id: session_id.to_owned(),
594 timestamp,
595 options: row_options(record, line),
596 }
597 }
598 ("user", Value::Array(items)) => {
599 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
600 user_part(session_id, uuid, ordinal, item, tool_call_names)
601 }));
602 Message::User {
603 id: uuid.to_owned(),
604 session_id: session_id.to_owned(),
605 timestamp,
606 options: row_options(record, line),
607 }
608 }
609 ("assistant", Value::Array(items)) => {
610 parts.extend(
611 items
612 .iter()
613 .enumerate()
614 .map(|(ordinal, item)| assistant_part(session_id, uuid, ordinal, item)),
615 );
616 Message::Assistant {
617 id: uuid.to_owned(),
618 session_id: session_id.to_owned(),
619 timestamp,
620 options: assistant_options(record, message_value, line),
621 }
622 }
623 _ => {
624 return Ok(vec![message_carrier_event(
625 session_id, uuid, timestamp, record, line, role,
626 )]);
627 }
628 };
629
630 let mut events = Vec::with_capacity(parts.len() + 1);
631 events.push(IngestEvent::Message(message));
632 events.extend(parts.into_iter().map(IngestEvent::Part));
633 Ok(events)
634}
635
636fn message_carrier_event(
637 session_id: &str,
638 uuid: &str,
639 timestamp: DateTime<Utc>,
640 record: &Value,
641 line: usize,
642 role: &str,
643) -> IngestEvent {
644 IngestEvent::Message(Message::System {
645 id: uuid.to_owned(),
646 session_id: session_id.to_owned(),
647 timestamp,
648 content: extract_self_str(&Value::String(role.to_owned())),
649 options: row_options(record, line),
650 })
651}
652
653fn text_part(
654 session_id: &str,
655 message_id: &str,
656 ordinal: usize,
657 text: Option<Extracted<String>>,
658 provenance: Provenance,
659) -> Part {
660 Part {
661 session_id: session_id.to_owned(),
662 id: part_id(message_id, ordinal),
663 message_id: message_id.to_owned(),
664 ordinal: part_ordinal(ordinal),
665 provenance,
666 options: empty_options(),
667 kind: PartKind::Text { text },
668 }
669}
670
671fn user_part(
672 session_id: &str,
673 message_id: &str,
674 ordinal: usize,
675 value: &Value,
676 tool_call_names: &HashMap<String, Extracted<String>>,
677) -> Part {
678 match value.get("type").and_then(Value::as_str) {
679 Some("text") => text_part(
680 session_id,
681 message_id,
682 ordinal,
683 extract_str(value, "text"),
684 Provenance::Conversational,
685 ),
686 Some("image") | Some("file") => file_part(
687 session_id,
688 message_id,
689 ordinal,
690 value,
691 Provenance::Conversational,
692 ),
693 Some("tool_result") => tool_result_part(
694 session_id,
695 message_id,
696 ordinal,
697 value,
698 None,
699 tool_call_names,
700 ),
701 _ => text_part(
704 session_id,
705 message_id,
706 ordinal,
707 Some(extract_compact_repr(value)),
708 Provenance::Conversational,
709 ),
710 }
711}
712
713fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, value: &Value) -> Part {
714 match value.get("type").and_then(Value::as_str) {
715 Some("text") => text_part(
716 session_id,
717 message_id,
718 ordinal,
719 extract_str(value, "text"),
720 Provenance::Conversational,
721 ),
722 Some("thinking") => Part {
723 session_id: session_id.to_owned(),
724 id: part_id(message_id, ordinal),
725 message_id: message_id.to_owned(),
726 ordinal: part_ordinal(ordinal),
727 provenance: Provenance::Conversational,
728 options: signature_options(value),
729 kind: PartKind::Reasoning {
730 text: extract_str(value, "thinking"),
731 },
732 },
733 Some(kind @ ("tool_use" | "server_tool_use")) => Part {
734 session_id: session_id.to_owned(),
735 id: part_id(message_id, ordinal),
736 message_id: message_id.to_owned(),
737 ordinal: part_ordinal(ordinal),
738 provenance: Provenance::Conversational,
739 options: empty_options(),
740 kind: PartKind::ToolCall {
741 call_id: extract_str(value, "id"),
742 name: extract_str(value, "name"),
743 params: value.get("input").cloned().unwrap_or(Value::Null),
744 provider_executed: kind == "server_tool_use",
745 },
746 },
747 Some("image") | Some("file") => file_part(
748 session_id,
749 message_id,
750 ordinal,
751 value,
752 Provenance::Conversational,
753 ),
754 _ => text_part(
755 session_id,
756 message_id,
757 ordinal,
758 Some(extract_compact_repr(value)),
759 Provenance::Conversational,
760 ),
761 }
762}
763
764fn tool_result_part(
765 session_id: &str,
766 message_id: &str,
767 ordinal: usize,
768 value: &Value,
769 source_tool_result: Option<&Value>,
770 tool_call_names: &HashMap<String, Extracted<String>>,
771) -> Part {
772 let call_id = extract_str(value, "tool_use_id");
773 let name = value
776 .str_field("tool_use_id")
777 .and_then(|id| tool_call_names.get(id))
778 .cloned();
779 let result = value
780 .get("content")
781 .cloned()
782 .or_else(|| source_tool_result.cloned())
783 .unwrap_or(Value::Null);
784 Part {
785 session_id: session_id.to_owned(),
786 id: part_id(message_id, ordinal),
787 message_id: message_id.to_owned(),
788 ordinal: part_ordinal(ordinal),
789 provenance: Provenance::Injected,
791 options: empty_options(),
792 kind: PartKind::ToolResult {
793 call_id,
794 name,
795 is_failure: value
796 .get("is_error")
797 .and_then(Value::as_bool)
798 .unwrap_or(false),
799 result,
800 },
801 }
802}
803
804fn file_part(
805 session_id: &str,
806 message_id: &str,
807 ordinal: usize,
808 value: &Value,
809 provenance: Provenance,
810) -> Part {
811 let media_type = value
812 .get("media_type")
813 .or_else(|| value.get("mime_type"))
814 .and_then(Value::as_str)
815 .map(ToOwned::to_owned);
816 let file_name = value
817 .get("file_name")
818 .or_else(|| value.get("name"))
819 .and_then(Value::as_str)
820 .map(ToOwned::to_owned);
821 let data = if let Some(source) = value.get("source") {
822 if let Some(url) = source.get("url").and_then(Value::as_str) {
823 FileData::Url(url.to_owned())
824 } else if let Some(bytes) = source.get("data").and_then(Value::as_str) {
825 FileData::String(bytes.to_owned())
826 } else {
827 FileData::String(compact_json(source))
828 }
829 } else if let Some(url) = value.get("url").and_then(Value::as_str) {
830 FileData::Url(url.to_owned())
831 } else {
832 FileData::String(compact_json(value))
833 };
834 Part {
835 session_id: session_id.to_owned(),
836 id: part_id(message_id, ordinal),
837 message_id: message_id.to_owned(),
838 ordinal: part_ordinal(ordinal),
839 provenance,
840 options: empty_options(),
841 kind: PartKind::File {
842 media_type,
843 file_name,
844 data,
845 },
846 }
847}
848
849fn row_options(record: &Value, line: usize) -> ProviderOptions {
850 let mut options = source_options(NAME, record);
851 if let Some(source) = options.get_mut("source").and_then(Value::as_object_mut) {
852 source.insert("line".to_owned(), json!(line));
853 source.insert("raw_type".to_owned(), json!(record.get("type")));
854 if let Some(subtype) = record.get("subtype") {
855 source.insert("subtype".to_owned(), subtype.clone());
856 }
857 }
858 options
859}
860
861fn assistant_options(record: &Value, message_value: &Value, line: usize) -> ProviderOptions {
862 let mut options = row_options(record, line);
863 let anthropic = json!({
864 "id": message_value.get("id"),
865 "model": message_value.get("model"),
866 "stop_reason": message_value.get("stop_reason"),
867 "usage": message_value.get("usage"),
868 });
869 options.insert("anthropic".to_owned(), anthropic);
870 options
871}
872
873fn signature_options(value: &Value) -> ProviderOptions {
874 let mut options = ProviderOptions::new();
875 if let Some(signature) = value.get("signature").and_then(Value::as_str) {
876 options.insert("anthropic".to_owned(), json!({ "signature": signature }));
877 }
878 options
879}
880
881fn is_tool_result(value: &Value) -> bool {
882 value.get("type").and_then(Value::as_str) == Some("tool_result")
883}
884
885fn serialize_session(
886 session: &crate::sessions::SessionWithMessages,
887 fidelity: RestoreFidelity,
888) -> Result<Vec<RestoredFile>, AdapterError> {
889 let session_raw = raw_record(&session.session.options);
895 if fidelity == RestoreFidelity::Native && session_raw.is_none() {
896 return serialize_session(session, RestoreFidelity::Foreign);
897 }
898
899 let relative_dir = session
900 .session
901 .options
902 .get("source")
903 .and_then(|source| source.get("relative_dir"))
904 .and_then(Value::as_str)
905 .map(PathBuf::from)
906 .unwrap_or_else(|| PathBuf::from(&session.session.id));
907
908 let mut messages = session.messages.clone();
909 if fidelity == RestoreFidelity::Native {
910 messages.sort_by(|left, right| {
911 source_line(left.message.options())
912 .cmp(&source_line(right.message.options()))
913 .then_with(|| by_timestamp_then_id(left, right))
914 });
915 } else {
916 messages.sort_by(by_timestamp_then_id);
917 }
918
919 let mut records = Vec::with_capacity(messages.len());
920 for message in &messages {
921 if fidelity == RestoreFidelity::Native {
922 if let Some(raw) = raw_record(message.message.options()) {
923 records.push(raw);
924 }
925 continue;
926 }
927 if let Some(record) = foreign_record(&session.session.id, message) {
928 records.push(record);
929 }
930 }
931
932 let mut files = vec![RestoredFile::new(
933 relative_dir.join("audit.jsonl"),
934 jsonl_bytes(NAME, &records)?,
935 fidelity,
936 )];
937
938 let meta_value = match fidelity {
940 RestoreFidelity::Native => session_raw,
941 RestoreFidelity::Foreign => Some(foreign_metadata(session)),
942 };
943 if let (Some(meta), Some(parent), Some(dir_name)) = (
944 meta_value,
945 relative_dir.parent(),
946 relative_dir.file_name().and_then(|name| name.to_str()),
947 ) {
948 files.push(RestoredFile::new(
949 parent.join(format!("{dir_name}.json")),
950 serde_json::to_vec(&meta).map_err(|error| {
951 AdapterError::schema(
952 NAME,
953 &session.session.id,
954 format!("json encode failed: {error}"),
955 )
956 })?,
957 fidelity,
958 ));
959 }
960 Ok(files)
961}
962
963fn foreign_metadata(session: &crate::sessions::SessionWithMessages) -> Value {
966 json!({
967 "sessionId": session.session.id,
968 "createdAt": session.session.created_at.timestamp_millis(),
969 "cwd": &*session.session.project,
970 })
971}
972
973fn foreign_record(session_id: &str, message: &crate::sessions::MessageWithParts) -> Option<Value> {
976 let (rtype, role) = match &message.message {
977 Message::User { .. } => ("user", "user"),
978 Message::Assistant { .. } => ("assistant", "assistant"),
979 Message::Tool { .. } => ("user", "user"),
980 Message::System { .. } => return None,
982 };
983 let content = Value::Array(message.parts.iter().map(audit_part).collect());
984 Some(json!({
985 "type": rtype,
986 "session_id": session_id,
987 "uuid": message.message.id(),
988 "message": { "role": role, "content": content },
989 "_audit_timestamp": message
990 .message
991 .timestamp()
992 .to_rfc3339_opts(SecondsFormat::Millis, true),
993 }))
994}
995
996fn audit_part(part: &Part) -> Value {
997 match &part.kind {
998 PartKind::Text { text } => json!({ "type": "text", "text": extracted_text(text) }),
999 PartKind::Reasoning { text } => {
1000 json!({ "type": "thinking", "thinking": extracted_text(text) })
1001 }
1002 PartKind::ToolCall {
1003 call_id,
1004 name,
1005 params,
1006 provider_executed,
1007 } => json!({
1008 "type": if *provider_executed { "server_tool_use" } else { "tool_use" },
1009 "id": extracted_text(call_id),
1010 "name": extracted_text(name),
1011 "input": params,
1012 }),
1013 PartKind::ToolResult {
1014 call_id,
1015 is_failure,
1016 result,
1017 ..
1018 } => json!({
1019 "type": "tool_result",
1020 "tool_use_id": extracted_text(call_id),
1021 "is_error": is_failure,
1022 "content": result,
1023 }),
1024 other => json!({
1025 "type": "text",
1026 "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
1027 }),
1028 }
1029}
1030
1031fn source_line(options: &ProviderOptions) -> Option<u64> {
1034 options
1035 .get("source")
1036 .and_then(|source| source.get("line"))
1037 .and_then(Value::as_u64)
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042 #![allow(clippy::expect_used, clippy::unwrap_used)]
1046
1047 use super::*;
1048 use crate::{handlers::ingest_adapter, sessions::Store};
1049 use tempfile::TempDir;
1050
1051 const FIXTURES: &str = concat!(
1054 env!("CARGO_MANIFEST_DIR"),
1055 "/tests/fixtures/adapter/claude_desktop_app/local-agent-mode-sessions"
1056 );
1057 const INNER_LOOP_ID: &str = "a9985b0b-2f5e-4125-b105-7f62376f5509";
1060
1061 #[test]
1062 fn probe_default_finds_cowork_store_under_home() -> anyhow::Result<()> {
1063 crate::adapter::test_support::assert_probe_default(
1064 &ClaudeDesktopAppFactory,
1065 &[
1066 "Library",
1067 "Application Support",
1068 "Claude",
1069 "local-agent-mode-sessions",
1070 ],
1071 )
1072 }
1073
1074 #[tokio::test(flavor = "multi_thread")]
1075 async fn ingests_cowork_fixture_into_canonical_shape() -> anyhow::Result<()> {
1076 let temp = TempDir::new()?;
1077 let store = Store::open_local(temp.path()).await?;
1078 let adapter = ClaudeDesktopAppAdapter::new(FIXTURES);
1079 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1080 assert_eq!(summary.dropped_sessions, 0, "no session-level rejections");
1081
1082 let ids = store.session_ids().await?;
1083 assert_eq!(ids.len(), 4, "exactly the four audit.jsonl sessions");
1086 assert!(
1087 !ids.iter().any(|id| id.contains(INNER_LOOP_ID)),
1088 "the nested inner Claude Code loop must not be ingested as a session",
1089 );
1090 for id in &ids {
1091 assert!(
1092 id.starts_with("local_"),
1093 "session id is the metadata sessionId (local_<uuid>): {id}",
1094 );
1095 }
1096
1097 let mut saw_call = false;
1098 let mut saw_resolved_result = false;
1099 let mut saw_reasoning = false;
1100 let mut saw_system = false;
1101 for id in &ids {
1102 let session = store.get_session(id).await?.expect("session round-trips");
1103 assert_eq!(session.session.source_agent, NAME);
1104 assert!(
1105 !(*session.session.project).is_empty(),
1106 "spec.md#model-project-non-empty",
1107 );
1108 for stored in &session.messages {
1109 if matches!(stored.message, Message::System { .. }) {
1110 saw_system = true;
1111 }
1112 for part in &stored.parts {
1113 match &part.kind {
1114 PartKind::ToolCall { .. } => saw_call = true,
1115 PartKind::ToolResult { name, .. } if name.is_some() => {
1116 saw_resolved_result = true;
1117 }
1118 PartKind::Reasoning { .. } => saw_reasoning = true,
1119 _ => {}
1120 }
1121 }
1122 }
1123 }
1124 assert!(saw_call, "assistant tool_use -> ToolCall");
1125 assert!(
1126 saw_resolved_result,
1127 "tool_result name resolved via the per-session tool_use map",
1128 );
1129 assert!(saw_reasoning, "assistant thinking -> Reasoning");
1130 assert!(
1131 saw_system,
1132 "system/result/... records become System carriers"
1133 );
1134 Ok(())
1135 }
1136
1137 #[tokio::test(flavor = "multi_thread")]
1138 async fn native_restore_round_trips() -> anyhow::Result<()> {
1139 let temp = TempDir::new()?;
1140 let store = Store::open_local(temp.path().join("store")).await?;
1141 let adapter = ClaudeDesktopAppAdapter::new(FIXTURES);
1142 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1143 let original = store.session_ids().await?;
1144
1145 let mut files = Vec::new();
1148 for id in &original {
1149 let session = store.get_session(id).await?.expect("round-trips");
1150 files.extend(ClaudeDesktopAppFactory.serialize(&session, RestoreFidelity::Native)?);
1151 }
1152 let restore_root = temp.path().join("restore");
1153 crate::adapter::write_restored_files(&restore_root, files)?;
1154
1155 let restore_store = Store::open_local(temp.path().join("restore-store")).await?;
1156 let restored = ClaudeDesktopAppAdapter::new(&restore_root);
1157 ingest_adapter(
1158 &restore_store,
1159 &restored,
1160 &crate::adapter::NoopOracle,
1161 |_| {},
1162 )
1163 .await?;
1164 assert_eq!(
1165 restore_store.session_ids().await?.len(),
1166 original.len(),
1167 "native restore re-ingests as the same session set",
1168 );
1169 Ok(())
1170 }
1171
1172 #[test]
1173 fn unexpected_message_content_becomes_lossless_carrier() {
1174 let names = HashMap::new();
1175 let record = json!({
1176 "type": "user",
1177 "uuid": "local-message-1",
1178 "_audit_timestamp": "2026-06-01T00:00:00Z",
1179 "message": {
1180 "role": "user",
1181 "content": null,
1182 },
1183 });
1184
1185 let events = record_events("local_session", 7, &record, Utc::now(), &names)
1186 .expect("carrier is valid");
1187 assert_eq!(events.len(), 1);
1188 assert!(matches!(
1189 &events[0],
1190 IngestEvent::Message(Message::System { id, content, .. })
1191 if id == "local-message-1" && content.as_deref().map(String::as_str) == Some("user")
1192 ));
1193 }
1194}