1use std::{
33 collections::HashMap,
34 path::{Path, PathBuf},
35};
36
37use async_stream::stream;
38use chrono::{DateTime, SecondsFormat, Utc};
39use serde_json::{Value, json};
40use tokio::sync::mpsc;
41use walkdir::WalkDir;
42
43use crate::{
44 sessions::IngestEvent,
45 wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
46};
47
48use super::{
49 Adapter, AdapterError, AdapterFactory, AdapterYield, AdapterYieldStream, DiscoverFuture, Env,
50 RestoreFidelity, RestoredFile, SkipOracle, SkipReason, by_timestamp_then_id, compact_json,
51 config_path, empty_options,
52 extract::{
53 Extracted, Source, bound_value, extract_compact_repr, extract_self_str, extract_str,
54 },
55 extracted_text,
56 jsonl::RECORD_CAP,
57 jsonl_bytes, part_id, part_ordinal, raw_record, source_options,
58};
59
60const NAME: &str = "claude-desktop-app";
61
62const CHANNEL_CAP: usize = 256;
65
66const SESSIONS_SUBDIR: &str = "local-agent-mode-sessions";
68
69pub struct ClaudeDesktopAppFactory;
72
73impl AdapterFactory for ClaudeDesktopAppFactory {
74 fn name(&self) -> &'static str {
75 NAME
76 }
77
78 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
79 Ok(Box::new(ClaudeDesktopAppAdapter::new(config_path(
80 NAME, config,
81 )?)))
82 }
83
84 fn probe_default(&self, env: &Env) -> Option<Value> {
85 let path = cowork_root(&env.home);
86 path.exists().then(|| json!({ "path": path }))
87 }
88
89 fn serialize(
90 &self,
91 session: &crate::sessions::SessionWithMessages,
92 fidelity: RestoreFidelity,
93 ) -> Result<Vec<RestoredFile>, AdapterError> {
94 serialize_session(session, fidelity)
95 }
96}
97
98fn cowork_root(home: &Path) -> PathBuf {
100 home.join("Library")
101 .join("Application Support")
102 .join("Claude")
103 .join(SESSIONS_SUBDIR)
104}
105
106#[derive(Debug, Clone)]
108pub struct ClaudeDesktopAppAdapter {
109 root: PathBuf,
110}
111
112impl ClaudeDesktopAppAdapter {
113 pub fn new(root: impl Into<PathBuf>) -> Self {
114 Self { root: root.into() }
115 }
116}
117
118impl Adapter for ClaudeDesktopAppAdapter {
119 fn discover(&self) -> DiscoverFuture<'_> {
120 let root = self.root.clone();
121 Box::pin(async move {
122 tokio::task::spawn_blocking(move || collect_sessions(&root).map(|files| files.len()))
123 .await
124 .map_err(join_error)?
125 })
126 }
127
128 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
129 let adapter = self.clone();
130 Box::pin(stream! {
131 let files = {
132 let root = adapter.root.clone();
133 tokio::task::spawn_blocking(move || collect_sessions(&root)).await
134 };
135 let files = match files {
136 Ok(Ok(files)) => files,
137 Ok(Err(error)) => { yield Err(error); return; }
138 Err(join) => { yield Err(join_error(join)); return; }
139 };
140
141 let mut survivors = Vec::with_capacity(files.len());
145 for file in files {
146 if let Some(ingested) = oracle.last_ingested_at(&file.session_id) {
147 let paths = (file.audit_path.clone(), file.meta_path.clone());
148 let mtime = tokio::task::spawn_blocking(move || {
149 newest_mtime(&paths.0).max(newest_mtime(&paths.1))
150 })
151 .await;
152 let mtime = match mtime {
153 Ok(mtime) => mtime,
154 Err(join) => { yield Err(join_error(join)); return; }
155 };
156 if let Some(mtime) = mtime
157 && mtime <= ingested
158 {
159 yield Ok(AdapterYield::Skipped {
160 session_id: Some(file.session_id.clone()),
161 project: None,
162 reason: SkipReason::Fresh,
163 });
164 continue;
165 }
166 }
167 survivors.push(file);
168 }
169
170 let (tx, mut rx) = mpsc::channel(CHANNEL_CAP);
171 let handle = tokio::task::spawn_blocking(move || read_sessions(survivors, &tx));
172 while let Some(item) = rx.recv().await {
173 yield item;
174 }
175 if let Err(join) = handle.await {
176 yield Err(join_error(join));
177 }
178 })
179 }
180}
181
182fn join_error(join: tokio::task::JoinError) -> AdapterError {
185 AdapterError::io(
186 NAME,
187 "blocking read task",
188 std::io::Error::other(join.to_string()),
189 )
190}
191
192struct CoworkSession {
196 session_id: String,
197 audit_path: PathBuf,
198 meta_path: PathBuf,
199 relative_dir: PathBuf,
200}
201
202fn collect_sessions(root: &Path) -> Result<Vec<CoworkSession>, AdapterError> {
206 if !root.exists() {
207 return Ok(Vec::new());
208 }
209 let io = |source| AdapterError::io(NAME, root.display().to_string(), source);
210 let mut out = Vec::new();
211 let walker = WalkDir::new(root).into_iter().filter_entry(|entry| {
212 !(entry.file_type().is_dir()
216 && entry
217 .file_name()
218 .to_str()
219 .is_some_and(|name| name.starts_with('.')))
220 });
221 for entry in walker {
222 let entry = entry.map_err(|error| io(error.into()))?;
223 if entry.file_name() != "audit.jsonl" {
224 continue;
225 }
226 let audit_path = entry.into_path();
227 let Some(dir) = audit_path.parent() else {
228 continue;
229 };
230 let Some(dir_name) = dir.file_name().and_then(|name| name.to_str()) else {
231 continue;
232 };
233 if !dir_name.starts_with("local_") {
237 continue;
238 }
239 let Some(workspace) = dir.parent() else {
240 continue;
241 };
242 let meta_path = workspace.join(format!("{dir_name}.json"));
243 let relative_dir = dir.strip_prefix(root).unwrap_or(dir).to_path_buf();
244 out.push(CoworkSession {
245 session_id: dir_name.to_owned(),
246 audit_path,
247 meta_path,
248 relative_dir,
249 });
250 }
251 out.sort_by(|a, b| a.audit_path.cmp(&b.audit_path));
252 Ok(out)
253}
254
255fn newest_mtime(path: &Path) -> Option<DateTime<Utc>> {
256 std::fs::metadata(path)
257 .and_then(|meta| meta.modified())
258 .ok()
259 .map(DateTime::<Utc>::from)
260}
261
262fn read_sessions(
263 sessions: Vec<CoworkSession>,
264 tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
265) {
266 for session in sessions {
267 if !read_one_session(session, tx) {
268 return;
269 }
270 }
271}
272
273fn read_one_session(
275 file: CoworkSession,
276 tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
277) -> bool {
278 macro_rules! emit {
279 ($item:expr) => {
280 if tx.blocking_send($item).is_err() {
281 return false;
282 }
283 };
284 }
285
286 let meta = match read_json(&file.meta_path) {
287 Ok(value) => value,
288 Err(error) => {
289 emit!(Err(error));
290 return true;
291 }
292 };
293 let session = match build_session(&file, &meta) {
294 Ok(session) => session,
295 Err(error) => {
296 emit!(Err(error));
297 return true;
298 }
299 };
300 let created_at = session.created_at;
301 let session_id = session.id.clone();
302 emit!(Ok(AdapterYield::Event(IngestEvent::Session(session))));
303
304 let bytes = match std::fs::read(&file.audit_path) {
305 Ok(bytes) => bytes,
306 Err(error) => {
307 emit!(Err(AdapterError::io(
308 NAME,
309 file.audit_path.display().to_string(),
310 error
311 )));
312 return true;
313 }
314 };
315 let text = match std::str::from_utf8(&bytes) {
316 Ok(text) => text,
317 Err(_) => {
318 emit!(Err(AdapterError::schema(
319 NAME,
320 file.audit_path.display().to_string(),
321 "audit.jsonl is not valid UTF-8",
322 )));
323 return true;
324 }
325 };
326
327 let mut tool_call_names: HashMap<String, Extracted<String>> = HashMap::new();
328 for (index, line) in text.lines().enumerate() {
329 let line_no = index + 1;
330 if line.trim().is_empty() {
331 continue;
332 }
333 if line.len() > RECORD_CAP {
334 emit!(Err(AdapterError::schema(
335 NAME,
336 format!("{}:{line_no}", file.audit_path.display()),
337 format!(
338 "audit line exceeds adapter record cap: {} bytes > {RECORD_CAP}",
339 line.len()
340 ),
341 )));
342 continue;
343 }
344 let mut record: Value = match serde_json::from_str(line) {
345 Ok(value) => value,
346 Err(error) => {
347 emit!(Err(AdapterError::parse(
348 NAME,
349 file.audit_path.display().to_string(),
350 line_no,
351 error,
352 )));
353 continue;
354 }
355 };
356 bound_value(&mut record);
357 capture_tool_call_names(&record, &mut tool_call_names);
358 match record_events(&session_id, line_no, &record, created_at, &tool_call_names) {
359 Ok(events) => {
360 for event in events {
361 emit!(Ok(AdapterYield::Event(event)));
362 }
363 }
364 Err(message) => emit!(Err(AdapterError::schema(
365 NAME,
366 format!("{}:{line_no}", file.audit_path.display()),
367 message,
368 ))),
369 }
370 }
371 true
372}
373
374fn read_json(path: &Path) -> Result<Value, AdapterError> {
377 use std::io::Read;
378 let io = |source| AdapterError::io(NAME, path.display().to_string(), source);
379 let mut file = std::fs::File::open(path).map_err(io)?;
380 let len = file.metadata().map_err(io)?.len();
381 if len > RECORD_CAP as u64 {
382 return Err(AdapterError::schema(
383 NAME,
384 path.display().to_string(),
385 format!("json file exceeds adapter record cap: {len} bytes > {RECORD_CAP}"),
386 ));
387 }
388 let mut bytes = Vec::with_capacity(len as usize);
389 file.read_to_end(&mut bytes).map_err(io)?;
390 let mut value: Value = serde_json::from_slice(&bytes)
391 .map_err(|error| AdapterError::parse(NAME, path.display().to_string(), 1, error))?;
392 bound_value(&mut value);
393 Ok(value)
394}
395
396fn build_session(file: &CoworkSession, meta: &Value) -> Result<Session, AdapterError> {
397 let display = file.meta_path.display().to_string();
398 let session_id = file.session_id.clone();
404
405 let created_at = meta
406 .get("createdAt")
407 .and_then(Value::as_i64)
408 .and_then(DateTime::from_timestamp_millis)
409 .ok_or_else(|| {
410 AdapterError::schema(
411 NAME,
412 display.clone(),
413 "metadata missing numeric `createdAt`",
414 )
415 })?;
416
417 let project = meta
421 .get("userSelectedFolders")
422 .and_then(Value::as_array)
423 .and_then(|folders| folders.first())
424 .filter(|first| first.as_str().is_some_and(|s| !s.is_empty()))
425 .and_then(|first| extract_self_str(first))
426 .or_else(|| extract_str(meta, "cwd").filter(|cwd| !cwd.trim().is_empty()))
427 .ok_or_else(|| {
428 AdapterError::schema(
429 NAME,
430 display,
431 "metadata has neither `userSelectedFolders[0]` nor `cwd` for the project",
432 )
433 })?;
434
435 let mut options = source_options(NAME, meta);
436 if let Some(source) = options.get_mut("source").and_then(Value::as_object_mut) {
437 source.insert(
438 "relative_dir".to_owned(),
439 json!(file.relative_dir.to_string_lossy()),
440 );
441 for key in [
442 "model",
443 "title",
444 "cliSessionId",
445 "systemPrompt",
446 "initialMessage",
447 "enabledMcpTools",
448 "vmProcessName",
449 "accountName",
450 ] {
451 if let Some(value) = meta.get(key) {
452 source.insert(key.to_owned(), value.clone());
453 }
454 }
455 }
456
457 Ok(Session {
458 id: session_id,
459 parent_session_id: None,
460 parent_message_id: None,
461 source_agent: NAME.to_owned(),
462 created_at,
463 project,
464 options,
465 })
466}
467
468fn capture_tool_call_names(record: &Value, map: &mut HashMap<String, Extracted<String>>) {
472 let Some(items) = record
473 .get("message")
474 .and_then(|message| message.get("content"))
475 .and_then(Value::as_array)
476 else {
477 return;
478 };
479 for item in items {
480 if !matches!(
481 item.get("type").and_then(Value::as_str),
482 Some("tool_use") | Some("server_tool_use")
483 ) {
484 continue;
485 }
486 let (Some(id), Some(name)) = (item.str_field("id"), extract_str(item, "name")) else {
487 continue;
488 };
489 map.insert(id.to_owned(), name);
490 }
491}
492
493fn record_events(
497 session_id: &str,
498 line: usize,
499 record: &Value,
500 default_timestamp: DateTime<Utc>,
501 tool_call_names: &HashMap<String, Extracted<String>>,
502) -> Result<Vec<IngestEvent>, String> {
503 let timestamp = record_timestamp(record).unwrap_or(default_timestamp);
504 let uuid = record
505 .get("uuid")
506 .and_then(Value::as_str)
507 .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
508 let rtype = record.get("type").and_then(Value::as_str);
509
510 match rtype {
511 Some("user") | Some("assistant") => {
512 let message_value = record.get("message").unwrap_or(&Value::Null);
513 message_events(
514 session_id,
515 &uuid,
516 timestamp,
517 record,
518 message_value,
519 tool_call_names,
520 line,
521 )
522 }
523 _ => {
527 let content = extract_str(record, "subtype").or_else(|| extract_str(record, "type"));
528 Ok(vec![IngestEvent::Message(Message::System {
529 id: uuid,
530 session_id: session_id.to_owned(),
531 timestamp,
532 content,
533 options: row_options(record, line),
534 })])
535 }
536 }
537}
538
539fn record_timestamp(record: &Value) -> Option<DateTime<Utc>> {
540 record
541 .get("_audit_timestamp")
542 .or_else(|| record.get("timestamp"))
543 .and_then(Value::as_str)
544 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
545 .map(|dt| dt.with_timezone(&Utc))
546}
547
548fn message_events(
549 session_id: &str,
550 uuid: &str,
551 timestamp: DateTime<Utc>,
552 record: &Value,
553 message_value: &Value,
554 tool_call_names: &HashMap<String, Extracted<String>>,
555 line: usize,
556) -> Result<Vec<IngestEvent>, String> {
557 let role = message_value
558 .get("role")
559 .and_then(Value::as_str)
560 .ok_or_else(|| "message missing role".to_owned())?;
561 let content = message_value.get("content").unwrap_or(&Value::Null);
562 let mut parts = Vec::new();
563 let message = match (role, content) {
564 ("user", Value::String(_)) => {
565 parts.push(text_part(
566 session_id,
567 uuid,
568 0,
569 extract_self_str(content),
570 Provenance::Conversational,
571 ));
572 Message::User {
573 id: uuid.to_owned(),
574 session_id: session_id.to_owned(),
575 timestamp,
576 options: row_options(record, line),
577 }
578 }
579 ("user", Value::Array(items)) if !items.is_empty() && items.iter().all(is_tool_result) => {
580 let source_tool_result = record.get("tool_use_result").cloned();
581 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
582 tool_result_part(
583 session_id,
584 uuid,
585 ordinal,
586 item,
587 source_tool_result.as_ref(),
588 tool_call_names,
589 )
590 }));
591 Message::Tool {
592 id: uuid.to_owned(),
593 session_id: session_id.to_owned(),
594 timestamp,
595 options: row_options(record, line),
596 }
597 }
598 ("user", Value::Array(items)) => {
599 parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
600 user_part(session_id, uuid, ordinal, item, tool_call_names)
601 }));
602 Message::User {
603 id: uuid.to_owned(),
604 session_id: session_id.to_owned(),
605 timestamp,
606 options: row_options(record, line),
607 }
608 }
609 ("assistant", Value::Array(items)) => {
610 parts.extend(
611 items
612 .iter()
613 .enumerate()
614 .map(|(ordinal, item)| assistant_part(session_id, uuid, ordinal, item)),
615 );
616 Message::Assistant {
617 id: uuid.to_owned(),
618 session_id: session_id.to_owned(),
619 timestamp,
620 options: assistant_options(record, message_value, line),
621 }
622 }
623 (other, _) => {
624 return Err(format!("unsupported message role {other}"));
625 }
626 };
627
628 let mut events = Vec::with_capacity(parts.len() + 1);
629 events.push(IngestEvent::Message(message));
630 events.extend(parts.into_iter().map(IngestEvent::Part));
631 Ok(events)
632}
633
634fn text_part(
635 session_id: &str,
636 message_id: &str,
637 ordinal: usize,
638 text: Option<Extracted<String>>,
639 provenance: Provenance,
640) -> Part {
641 Part {
642 session_id: session_id.to_owned(),
643 id: part_id(message_id, ordinal),
644 message_id: message_id.to_owned(),
645 ordinal: part_ordinal(ordinal),
646 provenance,
647 options: empty_options(),
648 kind: PartKind::Text { text },
649 }
650}
651
652fn user_part(
653 session_id: &str,
654 message_id: &str,
655 ordinal: usize,
656 value: &Value,
657 tool_call_names: &HashMap<String, Extracted<String>>,
658) -> Part {
659 match value.get("type").and_then(Value::as_str) {
660 Some("text") => text_part(
661 session_id,
662 message_id,
663 ordinal,
664 extract_str(value, "text"),
665 Provenance::Conversational,
666 ),
667 Some("image") | Some("file") => file_part(
668 session_id,
669 message_id,
670 ordinal,
671 value,
672 Provenance::Conversational,
673 ),
674 Some("tool_result") => tool_result_part(
675 session_id,
676 message_id,
677 ordinal,
678 value,
679 None,
680 tool_call_names,
681 ),
682 _ => text_part(
685 session_id,
686 message_id,
687 ordinal,
688 Some(extract_compact_repr(value)),
689 Provenance::Conversational,
690 ),
691 }
692}
693
694fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, value: &Value) -> Part {
695 match value.get("type").and_then(Value::as_str) {
696 Some("text") => text_part(
697 session_id,
698 message_id,
699 ordinal,
700 extract_str(value, "text"),
701 Provenance::Conversational,
702 ),
703 Some("thinking") => Part {
704 session_id: session_id.to_owned(),
705 id: part_id(message_id, ordinal),
706 message_id: message_id.to_owned(),
707 ordinal: part_ordinal(ordinal),
708 provenance: Provenance::Conversational,
709 options: signature_options(value),
710 kind: PartKind::Reasoning {
711 text: extract_str(value, "thinking"),
712 },
713 },
714 Some(kind @ ("tool_use" | "server_tool_use")) => Part {
715 session_id: session_id.to_owned(),
716 id: part_id(message_id, ordinal),
717 message_id: message_id.to_owned(),
718 ordinal: part_ordinal(ordinal),
719 provenance: Provenance::Conversational,
720 options: empty_options(),
721 kind: PartKind::ToolCall {
722 call_id: extract_str(value, "id"),
723 name: extract_str(value, "name"),
724 params: value.get("input").cloned().unwrap_or(Value::Null),
725 provider_executed: kind == "server_tool_use",
726 },
727 },
728 Some("image") | Some("file") => file_part(
729 session_id,
730 message_id,
731 ordinal,
732 value,
733 Provenance::Conversational,
734 ),
735 _ => text_part(
736 session_id,
737 message_id,
738 ordinal,
739 Some(extract_compact_repr(value)),
740 Provenance::Conversational,
741 ),
742 }
743}
744
745fn tool_result_part(
746 session_id: &str,
747 message_id: &str,
748 ordinal: usize,
749 value: &Value,
750 source_tool_result: Option<&Value>,
751 tool_call_names: &HashMap<String, Extracted<String>>,
752) -> Part {
753 let call_id = extract_str(value, "tool_use_id");
754 let name = value
757 .str_field("tool_use_id")
758 .and_then(|id| tool_call_names.get(id))
759 .cloned();
760 let result = value
761 .get("content")
762 .cloned()
763 .or_else(|| source_tool_result.cloned())
764 .unwrap_or(Value::Null);
765 Part {
766 session_id: session_id.to_owned(),
767 id: part_id(message_id, ordinal),
768 message_id: message_id.to_owned(),
769 ordinal: part_ordinal(ordinal),
770 provenance: Provenance::Injected,
772 options: empty_options(),
773 kind: PartKind::ToolResult {
774 call_id,
775 name,
776 is_failure: value
777 .get("is_error")
778 .and_then(Value::as_bool)
779 .unwrap_or(false),
780 result,
781 },
782 }
783}
784
785fn file_part(
786 session_id: &str,
787 message_id: &str,
788 ordinal: usize,
789 value: &Value,
790 provenance: Provenance,
791) -> Part {
792 let media_type = value
793 .get("media_type")
794 .or_else(|| value.get("mime_type"))
795 .and_then(Value::as_str)
796 .map(ToOwned::to_owned);
797 let file_name = value
798 .get("file_name")
799 .or_else(|| value.get("name"))
800 .and_then(Value::as_str)
801 .map(ToOwned::to_owned);
802 let data = if let Some(source) = value.get("source") {
803 if let Some(url) = source.get("url").and_then(Value::as_str) {
804 FileData::Url(url.to_owned())
805 } else if let Some(bytes) = source.get("data").and_then(Value::as_str) {
806 FileData::String(bytes.to_owned())
807 } else {
808 FileData::String(compact_json(source))
809 }
810 } else if let Some(url) = value.get("url").and_then(Value::as_str) {
811 FileData::Url(url.to_owned())
812 } else {
813 FileData::String(compact_json(value))
814 };
815 Part {
816 session_id: session_id.to_owned(),
817 id: part_id(message_id, ordinal),
818 message_id: message_id.to_owned(),
819 ordinal: part_ordinal(ordinal),
820 provenance,
821 options: empty_options(),
822 kind: PartKind::File {
823 media_type,
824 file_name,
825 data,
826 },
827 }
828}
829
830fn row_options(record: &Value, line: usize) -> ProviderOptions {
831 let mut options = source_options(NAME, record);
832 if let Some(source) = options.get_mut("source").and_then(Value::as_object_mut) {
833 source.insert("line".to_owned(), json!(line));
834 source.insert("raw_type".to_owned(), json!(record.get("type")));
835 if let Some(subtype) = record.get("subtype") {
836 source.insert("subtype".to_owned(), subtype.clone());
837 }
838 }
839 options
840}
841
842fn assistant_options(record: &Value, message_value: &Value, line: usize) -> ProviderOptions {
843 let mut options = row_options(record, line);
844 let anthropic = json!({
845 "id": message_value.get("id"),
846 "model": message_value.get("model"),
847 "stop_reason": message_value.get("stop_reason"),
848 "usage": message_value.get("usage"),
849 });
850 options.insert("anthropic".to_owned(), anthropic);
851 options
852}
853
854fn signature_options(value: &Value) -> ProviderOptions {
855 let mut options = ProviderOptions::new();
856 if let Some(signature) = value.get("signature").and_then(Value::as_str) {
857 options.insert("anthropic".to_owned(), json!({ "signature": signature }));
858 }
859 options
860}
861
862fn is_tool_result(value: &Value) -> bool {
863 value.get("type").and_then(Value::as_str) == Some("tool_result")
864}
865
866fn serialize_session(
867 session: &crate::sessions::SessionWithMessages,
868 fidelity: RestoreFidelity,
869) -> Result<Vec<RestoredFile>, AdapterError> {
870 let session_raw = raw_record(&session.session.options);
876 if fidelity == RestoreFidelity::Native && session_raw.is_none() {
877 return serialize_session(session, RestoreFidelity::Foreign);
878 }
879
880 let relative_dir = session
881 .session
882 .options
883 .get("source")
884 .and_then(|source| source.get("relative_dir"))
885 .and_then(Value::as_str)
886 .map(PathBuf::from)
887 .unwrap_or_else(|| PathBuf::from(&session.session.id));
888
889 let mut messages = session.messages.clone();
890 if fidelity == RestoreFidelity::Native {
891 messages.sort_by(|left, right| {
892 source_line(left.message.options())
893 .cmp(&source_line(right.message.options()))
894 .then_with(|| by_timestamp_then_id(left, right))
895 });
896 } else {
897 messages.sort_by(by_timestamp_then_id);
898 }
899
900 let mut records = Vec::with_capacity(messages.len());
901 for message in &messages {
902 if fidelity == RestoreFidelity::Native {
903 if let Some(raw) = raw_record(message.message.options()) {
904 records.push(raw);
905 }
906 continue;
907 }
908 if let Some(record) = foreign_record(&session.session.id, message) {
909 records.push(record);
910 }
911 }
912
913 let mut files = vec![RestoredFile::new(
914 relative_dir.join("audit.jsonl"),
915 jsonl_bytes(NAME, &records)?,
916 fidelity,
917 )];
918
919 let meta_value = match fidelity {
921 RestoreFidelity::Native => session_raw,
922 RestoreFidelity::Foreign => Some(foreign_metadata(session)),
923 };
924 if let (Some(meta), Some(parent), Some(dir_name)) = (
925 meta_value,
926 relative_dir.parent(),
927 relative_dir.file_name().and_then(|name| name.to_str()),
928 ) {
929 files.push(RestoredFile::new(
930 parent.join(format!("{dir_name}.json")),
931 serde_json::to_vec(&meta).map_err(|error| {
932 AdapterError::schema(
933 NAME,
934 &session.session.id,
935 format!("json encode failed: {error}"),
936 )
937 })?,
938 fidelity,
939 ));
940 }
941 Ok(files)
942}
943
944fn foreign_metadata(session: &crate::sessions::SessionWithMessages) -> Value {
947 json!({
948 "sessionId": session.session.id,
949 "createdAt": session.session.created_at.timestamp_millis(),
950 "cwd": &*session.session.project,
951 })
952}
953
954fn foreign_record(session_id: &str, message: &crate::sessions::MessageWithParts) -> Option<Value> {
957 let (rtype, role) = match &message.message {
958 Message::User { .. } => ("user", "user"),
959 Message::Assistant { .. } => ("assistant", "assistant"),
960 Message::Tool { .. } => ("user", "user"),
961 Message::System { .. } => return None,
963 };
964 let content = Value::Array(message.parts.iter().map(audit_part).collect());
965 Some(json!({
966 "type": rtype,
967 "session_id": session_id,
968 "uuid": message.message.id(),
969 "message": { "role": role, "content": content },
970 "_audit_timestamp": message
971 .message
972 .timestamp()
973 .to_rfc3339_opts(SecondsFormat::Millis, true),
974 }))
975}
976
977fn audit_part(part: &Part) -> Value {
978 match &part.kind {
979 PartKind::Text { text } => json!({ "type": "text", "text": extracted_text(text) }),
980 PartKind::Reasoning { text } => {
981 json!({ "type": "thinking", "thinking": extracted_text(text) })
982 }
983 PartKind::ToolCall {
984 call_id,
985 name,
986 params,
987 provider_executed,
988 } => json!({
989 "type": if *provider_executed { "server_tool_use" } else { "tool_use" },
990 "id": extracted_text(call_id),
991 "name": extracted_text(name),
992 "input": params,
993 }),
994 PartKind::ToolResult {
995 call_id,
996 is_failure,
997 result,
998 ..
999 } => json!({
1000 "type": "tool_result",
1001 "tool_use_id": extracted_text(call_id),
1002 "is_error": is_failure,
1003 "content": result,
1004 }),
1005 other => json!({
1006 "type": "text",
1007 "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
1008 }),
1009 }
1010}
1011
1012fn source_line(options: &ProviderOptions) -> Option<u64> {
1015 options
1016 .get("source")
1017 .and_then(|source| source.get("line"))
1018 .and_then(Value::as_u64)
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023 #![allow(clippy::expect_used, clippy::unwrap_used)]
1027
1028 use super::*;
1029 use crate::{handlers::ingest_adapter, sessions::Store};
1030 use tempfile::TempDir;
1031
1032 const FIXTURES: &str = concat!(
1035 env!("CARGO_MANIFEST_DIR"),
1036 "/tests/fixtures/adapter/claude_desktop_app/local-agent-mode-sessions"
1037 );
1038 const INNER_LOOP_ID: &str = "a9985b0b-2f5e-4125-b105-7f62376f5509";
1041
1042 #[test]
1043 fn probe_default_finds_cowork_store_under_home() -> anyhow::Result<()> {
1044 crate::adapter::test_support::assert_probe_default(
1045 &ClaudeDesktopAppFactory,
1046 &[
1047 "Library",
1048 "Application Support",
1049 "Claude",
1050 "local-agent-mode-sessions",
1051 ],
1052 )
1053 }
1054
1055 #[tokio::test(flavor = "multi_thread")]
1056 async fn ingests_cowork_fixture_into_canonical_shape() -> anyhow::Result<()> {
1057 let temp = TempDir::new()?;
1058 let store = Store::open_local(temp.path()).await?;
1059 let adapter = ClaudeDesktopAppAdapter::new(FIXTURES);
1060 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1061 assert_eq!(summary.dropped_sessions, 0, "no session-level rejections");
1062
1063 let ids = store.session_ids().await?;
1064 assert_eq!(ids.len(), 4, "exactly the four audit.jsonl sessions");
1067 assert!(
1068 !ids.iter().any(|id| id.contains(INNER_LOOP_ID)),
1069 "the nested inner Claude Code loop must not be ingested as a session",
1070 );
1071 for id in &ids {
1072 assert!(
1073 id.starts_with("local_"),
1074 "session id is the metadata sessionId (local_<uuid>): {id}",
1075 );
1076 }
1077
1078 let mut saw_call = false;
1079 let mut saw_resolved_result = false;
1080 let mut saw_reasoning = false;
1081 let mut saw_system = false;
1082 for id in &ids {
1083 let session = store.get_session(id).await?.expect("session round-trips");
1084 assert_eq!(session.session.source_agent, NAME);
1085 assert!(
1086 !(*session.session.project).is_empty(),
1087 "spec.md#model-project-non-empty",
1088 );
1089 for stored in &session.messages {
1090 if matches!(stored.message, Message::System { .. }) {
1091 saw_system = true;
1092 }
1093 for part in &stored.parts {
1094 match &part.kind {
1095 PartKind::ToolCall { .. } => saw_call = true,
1096 PartKind::ToolResult { name, .. } if name.is_some() => {
1097 saw_resolved_result = true;
1098 }
1099 PartKind::Reasoning { .. } => saw_reasoning = true,
1100 _ => {}
1101 }
1102 }
1103 }
1104 }
1105 assert!(saw_call, "assistant tool_use -> ToolCall");
1106 assert!(
1107 saw_resolved_result,
1108 "tool_result name resolved via the per-session tool_use map",
1109 );
1110 assert!(saw_reasoning, "assistant thinking -> Reasoning");
1111 assert!(
1112 saw_system,
1113 "system/result/... records become System carriers"
1114 );
1115 Ok(())
1116 }
1117
1118 #[tokio::test(flavor = "multi_thread")]
1119 async fn native_restore_round_trips() -> anyhow::Result<()> {
1120 let temp = TempDir::new()?;
1121 let store = Store::open_local(temp.path().join("store")).await?;
1122 let adapter = ClaudeDesktopAppAdapter::new(FIXTURES);
1123 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1124 let original = store.session_ids().await?;
1125
1126 let mut files = Vec::new();
1129 for id in &original {
1130 let session = store.get_session(id).await?.expect("round-trips");
1131 files.extend(ClaudeDesktopAppFactory.serialize(&session, RestoreFidelity::Native)?);
1132 }
1133 let restore_root = temp.path().join("restore");
1134 crate::adapter::write_restored_files(&restore_root, files)?;
1135
1136 let restore_store = Store::open_local(temp.path().join("restore-store")).await?;
1137 let restored = ClaudeDesktopAppAdapter::new(&restore_root);
1138 ingest_adapter(
1139 &restore_store,
1140 &restored,
1141 &crate::adapter::NoopOracle,
1142 |_| {},
1143 )
1144 .await?;
1145 assert_eq!(
1146 restore_store.session_ids().await?.len(),
1147 original.len(),
1148 "native restore re-ingests as the same session set",
1149 );
1150 Ok(())
1151 }
1152}