Skip to main content

pond/adapter/
claude_desktop_app.rs

1//! Claude Desktop app adapter - Cowork / agent-mode sessions only.
2//!
3//! Source path:
4//! `~/Library/Application Support/Claude/local-agent-mode-sessions/<acct>/<workspace>/local_<uuid>/audit.jsonl`
5//! with a sibling `local_<uuid>.json` metadata file. Each `audit.jsonl` is one
6//! session (one JSON object per line); the metadata carries `sessionId`,
7//! `createdAt`, the project folder, and UI fields.
8//!
9//! Scope is deliberately narrow (spec.md#adapters, locked product split):
10//! - It NEVER reads `~/.claude/projects` or the `claude-code-sessions/`
11//!   wrappers - the Desktop "Code" tab writes there in CLI format and is
12//!   covered by `claude-code`.
13//! - It NEVER descends into a session's nested `.claude/` directory. That is
14//!   the inner Claude Code loop the Cowork sandbox runs; `audit.jsonl` already
15//!   represents that conversation, so ingesting the `.claude/projects/**/*.jsonl`
16//!   transcripts under it would double-count the same session. Discovery is
17//!   scoped to `local_*/audit.jsonl` and the walk prunes hidden dirs, so this
18//!   adapter cannot pick up the inner loop. This is why it drives the [`Adapter`]
19//!   seam directly rather than through `JsonlTree` (whose blanket `*.jsonl` walk
20//!   would find the inner transcripts).
21//!
22//! The per-line `message` object is the Anthropic Messages shape (the same
23//! content blocks claude-code carries), so the part mapping mirrors that
24//! adapter. Records that are not a `user`/`assistant` turn (`system`, `result`,
25//! `rate_limit_event`, ...) become System carriers: their `subtype`/`type` is
26//! the content and the verbatim row survives in `options.source.raw_record`, so
27//! native restore reproduces the full `audit.jsonl`. `system/permission_*`
28//! records carry no tool-call id to link an approval to, so they stay System
29//! carriers rather than fabricate the `tool_call_id` a `ToolApproval*` part
30//! requires (spec.md#model-no-synthesis).
31
32use std::{
33    collections::HashMap,
34    path::{Path, PathBuf},
35};
36
37use async_stream::stream;
38use chrono::{DateTime, SecondsFormat, Utc};
39use serde_json::{Value, json};
40use tokio::sync::mpsc;
41use walkdir::WalkDir;
42
43use crate::{
44    sessions::IngestEvent,
45    wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
46};
47
48use super::{
49    Adapter, AdapterError, AdapterFactory, AdapterYield, AdapterYieldStream, DiscoverFuture, Env,
50    RestoreFidelity, RestoredFile, SkipOracle, SkipReason, by_timestamp_then_id, compact_json,
51    config_path, empty_options,
52    extract::{
53        Extracted, Source, bound_value, extract_compact_repr, extract_self_str, extract_str,
54    },
55    extracted_text,
56    jsonl::RECORD_CAP,
57    jsonl_bytes, part_id, part_ordinal, raw_record, source_options,
58};
59
60const NAME: &str = "claude-desktop-app";
61
62/// Event-channel bound; doubles as backpressure - the blocking reader parks on
63/// `blocking_send` when the consumer lags.
64const CHANNEL_CAP: usize = 256;
65
66/// The session-store subpath under `~/Library/Application Support/Claude`.
67const SESSIONS_SUBDIR: &str = "local-agent-mode-sessions";
68
69/// Stateless factory: opens [`ClaudeDesktopAppAdapter`] instances and probes for
70/// the Cowork store under `~/Library/Application Support/Claude`.
71pub struct ClaudeDesktopAppFactory;
72
73impl AdapterFactory for ClaudeDesktopAppFactory {
74    fn name(&self) -> &'static str {
75        NAME
76    }
77
78    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
79        Ok(Box::new(ClaudeDesktopAppAdapter::new(config_path(
80            NAME, config,
81        )?)))
82    }
83
84    fn probe_default(&self, env: &Env) -> Option<Value> {
85        let path = cowork_root(&env.home);
86        path.exists().then(|| json!({ "path": path }))
87    }
88
89    fn serialize(
90        &self,
91        session: &crate::sessions::SessionWithMessages,
92        fidelity: RestoreFidelity,
93    ) -> Result<Vec<RestoredFile>, AdapterError> {
94        serialize_session(session, fidelity)
95    }
96}
97
98/// `~/Library/Application Support/Claude/local-agent-mode-sessions`.
99fn cowork_root(home: &Path) -> PathBuf {
100    home.join("Library")
101        .join("Application Support")
102        .join("Claude")
103        .join(SESSIONS_SUBDIR)
104}
105
106/// Configured Cowork reader, rooted at a `local-agent-mode-sessions/` directory.
107#[derive(Debug, Clone)]
108pub struct ClaudeDesktopAppAdapter {
109    root: PathBuf,
110}
111
112impl ClaudeDesktopAppAdapter {
113    pub fn new(root: impl Into<PathBuf>) -> Self {
114        Self { root: root.into() }
115    }
116}
117
118impl Adapter for ClaudeDesktopAppAdapter {
119    fn discover(&self) -> DiscoverFuture<'_> {
120        let root = self.root.clone();
121        Box::pin(async move {
122            tokio::task::spawn_blocking(move || collect_sessions(&root).map(|files| files.len()))
123                .await
124                .map_err(join_error)?
125        })
126    }
127
128    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
129        let adapter = self.clone();
130        Box::pin(stream! {
131            let files = {
132                let root = adapter.root.clone();
133                tokio::task::spawn_blocking(move || collect_sessions(&root)).await
134            };
135            let files = match files {
136                Ok(Ok(files)) => files,
137                Ok(Err(error)) => { yield Err(error); return; }
138                Err(join) => { yield Err(join_error(join)); return; }
139            };
140
141            // Freshness pre-pass: an mtime stat only when the oracle actually
142            // has a watermark for this session - first ingest (or NoopOracle)
143            // has nothing to compare against, so the stat would be wasted.
144            let mut survivors = Vec::with_capacity(files.len());
145            for file in files {
146                if let Some(ingested) = oracle.last_ingested_at(&file.session_id) {
147                    let paths = (file.audit_path.clone(), file.meta_path.clone());
148                    let mtime = tokio::task::spawn_blocking(move || {
149                        newest_mtime(&paths.0).max(newest_mtime(&paths.1))
150                    })
151                    .await;
152                    let mtime = match mtime {
153                        Ok(mtime) => mtime,
154                        Err(join) => { yield Err(join_error(join)); return; }
155                    };
156                    if let Some(mtime) = mtime
157                        && mtime <= ingested
158                    {
159                        yield Ok(AdapterYield::Skipped {
160                            session_id: Some(file.session_id.clone()),
161                            project: None,
162                            reason: SkipReason::Fresh,
163                        });
164                        continue;
165                    }
166                }
167                survivors.push(file);
168            }
169
170            let (tx, mut rx) = mpsc::channel(CHANNEL_CAP);
171            let handle = tokio::task::spawn_blocking(move || read_sessions(survivors, &tx));
172            while let Some(item) = rx.recv().await {
173                yield item;
174            }
175            if let Err(join) = handle.await {
176                yield Err(join_error(join));
177            }
178        })
179    }
180}
181
182/// A blocking-task panic is a pond bug, not bad source data, so it fails the
183/// whole run rather than skipping a session.
184fn join_error(join: tokio::task::JoinError) -> AdapterError {
185    AdapterError::io(
186        NAME,
187        "blocking read task",
188        std::io::Error::other(join.to_string()),
189    )
190}
191
192/// One Cowork session located on disk: its `audit.jsonl`, its sibling metadata
193/// file, the session id (the `local_<uuid>` directory name, which equals
194/// `metadata.sessionId`), and the session dir relative to the root (for restore).
195struct CoworkSession {
196    session_id: String,
197    audit_path: PathBuf,
198    meta_path: PathBuf,
199    relative_dir: PathBuf,
200}
201
202/// Walk the root for `local_*/audit.jsonl`, pruning hidden directories so the
203/// nested `.claude/` inner loop is never reached. Sorted for deterministic
204/// ingest order. A missing root means "no sessions yet", not an error.
205fn collect_sessions(root: &Path) -> Result<Vec<CoworkSession>, AdapterError> {
206    if !root.exists() {
207        return Ok(Vec::new());
208    }
209    let io = |source| AdapterError::io(NAME, root.display().to_string(), source);
210    let mut out = Vec::new();
211    let walker = WalkDir::new(root).into_iter().filter_entry(|entry| {
212        // Prune any hidden dir (`.claude`, `.audit-key` is a file). The inner
213        // Claude Code loop lives under `.claude/`, so this is the structural
214        // guard against double-counting it (spec.md#adapters).
215        !(entry.file_type().is_dir()
216            && entry
217                .file_name()
218                .to_str()
219                .is_some_and(|name| name.starts_with('.')))
220    });
221    for entry in walker {
222        let entry = entry.map_err(|error| io(error.into()))?;
223        if entry.file_name() != "audit.jsonl" {
224            continue;
225        }
226        let audit_path = entry.into_path();
227        let Some(dir) = audit_path.parent() else {
228            continue;
229        };
230        let Some(dir_name) = dir.file_name().and_then(|name| name.to_str()) else {
231            continue;
232        };
233        // The transcript dir is `local_<uuid>`; its name equals
234        // `metadata.sessionId`. A stray `audit.jsonl` elsewhere is not a Cowork
235        // session.
236        if !dir_name.starts_with("local_") {
237            continue;
238        }
239        let Some(workspace) = dir.parent() else {
240            continue;
241        };
242        let meta_path = workspace.join(format!("{dir_name}.json"));
243        let relative_dir = dir.strip_prefix(root).unwrap_or(dir).to_path_buf();
244        out.push(CoworkSession {
245            session_id: dir_name.to_owned(),
246            audit_path,
247            meta_path,
248            relative_dir,
249        });
250    }
251    out.sort_by(|a, b| a.audit_path.cmp(&b.audit_path));
252    Ok(out)
253}
254
255fn newest_mtime(path: &Path) -> Option<DateTime<Utc>> {
256    std::fs::metadata(path)
257        .and_then(|meta| meta.modified())
258        .ok()
259        .map(DateTime::<Utc>::from)
260}
261
262fn read_sessions(
263    sessions: Vec<CoworkSession>,
264    tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
265) {
266    for session in sessions {
267        if !read_one_session(session, tx) {
268            return;
269        }
270    }
271}
272
273/// Returns `false` when the consumer dropped the receiver and the read should stop.
274fn read_one_session(
275    file: CoworkSession,
276    tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
277) -> bool {
278    macro_rules! emit {
279        ($item:expr) => {
280            if tx.blocking_send($item).is_err() {
281                return false;
282            }
283        };
284    }
285
286    let meta = match read_json(&file.meta_path) {
287        Ok(value) => value,
288        Err(error) => {
289            emit!(Err(error));
290            return true;
291        }
292    };
293    let session = match build_session(&file, &meta) {
294        Ok(session) => session,
295        Err(error) => {
296            emit!(Err(error));
297            return true;
298        }
299    };
300    let created_at = session.created_at;
301    let session_id = session.id.clone();
302    emit!(Ok(AdapterYield::Event(IngestEvent::Session(session))));
303
304    let bytes = match std::fs::read(&file.audit_path) {
305        Ok(bytes) => bytes,
306        Err(error) => {
307            emit!(Err(AdapterError::io(
308                NAME,
309                file.audit_path.display().to_string(),
310                error
311            )));
312            return true;
313        }
314    };
315    let text = match std::str::from_utf8(&bytes) {
316        Ok(text) => text,
317        Err(_) => {
318            emit!(Err(AdapterError::schema(
319                NAME,
320                file.audit_path.display().to_string(),
321                "audit.jsonl is not valid UTF-8",
322            )));
323            return true;
324        }
325    };
326
327    let mut tool_call_names: HashMap<String, Extracted<String>> = HashMap::new();
328    for (index, line) in text.lines().enumerate() {
329        let line_no = index + 1;
330        if line.trim().is_empty() {
331            continue;
332        }
333        if line.len() > RECORD_CAP {
334            emit!(Err(AdapterError::schema(
335                NAME,
336                format!("{}:{line_no}", file.audit_path.display()),
337                format!(
338                    "audit line exceeds adapter record cap: {} bytes > {RECORD_CAP}",
339                    line.len()
340                ),
341            )));
342            continue;
343        }
344        let mut record: Value = match serde_json::from_str(line) {
345            Ok(value) => value,
346            Err(error) => {
347                emit!(Err(AdapterError::parse(
348                    NAME,
349                    file.audit_path.display().to_string(),
350                    line_no,
351                    error,
352                )));
353                continue;
354            }
355        };
356        bound_value(&mut record);
357        capture_tool_call_names(&record, &mut tool_call_names);
358        match record_events(&session_id, line_no, &record, created_at, &tool_call_names) {
359            Ok(events) => {
360                for event in events {
361                    emit!(Ok(AdapterYield::Event(event)));
362                }
363            }
364            Err(message) => emit!(Err(AdapterError::schema(
365                NAME,
366                format!("{}:{line_no}", file.audit_path.display()),
367                message,
368            ))),
369        }
370    }
371    true
372}
373
374/// Read one JSON file whole, bounding every string leaf at the seam cap
375/// (spec.md#adapter-bounded-values).
376fn read_json(path: &Path) -> Result<Value, AdapterError> {
377    use std::io::Read;
378    let io = |source| AdapterError::io(NAME, path.display().to_string(), source);
379    let mut file = std::fs::File::open(path).map_err(io)?;
380    let len = file.metadata().map_err(io)?.len();
381    if len > RECORD_CAP as u64 {
382        return Err(AdapterError::schema(
383            NAME,
384            path.display().to_string(),
385            format!("json file exceeds adapter record cap: {len} bytes > {RECORD_CAP}"),
386        ));
387    }
388    let mut bytes = Vec::with_capacity(len as usize);
389    file.read_to_end(&mut bytes).map_err(io)?;
390    let mut value: Value = serde_json::from_slice(&bytes)
391        .map_err(|error| AdapterError::parse(NAME, path.display().to_string(), 1, error))?;
392    bound_value(&mut value);
393    Ok(value)
394}
395
396fn build_session(file: &CoworkSession, meta: &Value) -> Result<Session, AdapterError> {
397    let display = file.meta_path.display().to_string();
398    // The `local_<uuid>` dir name is authoritative for the id (it equals
399    // `metadata.sessionId`) AND is what the freshness oracle is keyed on in
400    // events_with, so use it directly to keep the two in lockstep - a divergence
401    // would silently disable the freshness skip. The raw metadata (with its own
402    // `sessionId`) is preserved in options.source.raw_record.
403    let session_id = file.session_id.clone();
404
405    let created_at = meta
406        .get("createdAt")
407        .and_then(Value::as_i64)
408        .and_then(DateTime::from_timestamp_millis)
409        .ok_or_else(|| {
410            AdapterError::schema(
411                NAME,
412                display.clone(),
413                "metadata missing numeric `createdAt`",
414            )
415        })?;
416
417    // spec.md#model-project-non-empty: prefer the real folder the user opened
418    // (`userSelectedFolders[0]`), else the sandbox `cwd` (always present). Both
419    // are extracted from real source data, never synthesized.
420    let project = meta
421        .get("userSelectedFolders")
422        .and_then(Value::as_array)
423        .and_then(|folders| folders.first())
424        .filter(|first| first.as_str().is_some_and(|s| !s.is_empty()))
425        .and_then(|first| extract_self_str(first))
426        .or_else(|| extract_str(meta, "cwd").filter(|cwd| !cwd.trim().is_empty()))
427        .ok_or_else(|| {
428            AdapterError::schema(
429                NAME,
430                display,
431                "metadata has neither `userSelectedFolders[0]` nor `cwd` for the project",
432            )
433        })?;
434
435    let mut options = source_options(NAME, meta);
436    if let Some(source) = options.get_mut("source").and_then(Value::as_object_mut) {
437        source.insert(
438            "relative_dir".to_owned(),
439            json!(file.relative_dir.to_string_lossy()),
440        );
441        for key in [
442            "model",
443            "title",
444            "cliSessionId",
445            "systemPrompt",
446            "initialMessage",
447            "enabledMcpTools",
448            "vmProcessName",
449            "accountName",
450        ] {
451            if let Some(value) = meta.get(key) {
452                source.insert(key.to_owned(), value.clone());
453            }
454        }
455    }
456
457    Ok(Session {
458        id: session_id,
459        parent_session_id: None,
460        parent_message_id: None,
461        source_agent: NAME.to_owned(),
462        created_at,
463        project,
464        options,
465    })
466}
467
468/// Stash every `tool_use` block's `id -> name` from an assistant record's
469/// `message.content[]`, so a later `tool_result` row can resolve its name.
470/// Idempotent and safe on any record (non-assistant rows contribute nothing).
471fn capture_tool_call_names(record: &Value, map: &mut HashMap<String, Extracted<String>>) {
472    let Some(items) = record
473        .get("message")
474        .and_then(|message| message.get("content"))
475        .and_then(Value::as_array)
476    else {
477        return;
478    };
479    for item in items {
480        if !matches!(
481            item.get("type").and_then(Value::as_str),
482            Some("tool_use") | Some("server_tool_use")
483        ) {
484            continue;
485        }
486        let (Some(id), Some(name)) = (item.str_field("id"), extract_str(item, "name")) else {
487            continue;
488        };
489        map.insert(id.to_owned(), name);
490    }
491}
492
493/// Map one audit record into canonical events. `user`/`assistant` records carry
494/// an inner Anthropic `message`; everything else becomes a System carrier whose
495/// verbatim row survives in `options.source.raw_record` for lossless restore.
496fn record_events(
497    session_id: &str,
498    line: usize,
499    record: &Value,
500    default_timestamp: DateTime<Utc>,
501    tool_call_names: &HashMap<String, Extracted<String>>,
502) -> Result<Vec<IngestEvent>, String> {
503    let timestamp = record_timestamp(record).unwrap_or(default_timestamp);
504    let uuid = record
505        .get("uuid")
506        .and_then(Value::as_str)
507        .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
508    let rtype = record.get("type").and_then(Value::as_str);
509
510    match rtype {
511        Some("user") | Some("assistant") => {
512            let message_value = record.get("message").unwrap_or(&Value::Null);
513            message_events(
514                session_id,
515                &uuid,
516                timestamp,
517                record,
518                message_value,
519                tool_call_names,
520                line,
521            )
522        }
523        // `system` (init/status/api_retry/permission_*), `result`,
524        // `rate_limit_event`, `tool_use_summary`, and any future record type
525        // are kept as System carriers (spec.md#adapter-integrity-no-silent-drops).
526        _ => {
527            let content = extract_str(record, "subtype").or_else(|| extract_str(record, "type"));
528            Ok(vec![IngestEvent::Message(Message::System {
529                id: uuid,
530                session_id: session_id.to_owned(),
531                timestamp,
532                content,
533                options: row_options(record, line),
534            })])
535        }
536    }
537}
538
539fn record_timestamp(record: &Value) -> Option<DateTime<Utc>> {
540    record
541        .get("_audit_timestamp")
542        .or_else(|| record.get("timestamp"))
543        .and_then(Value::as_str)
544        .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
545        .map(|dt| dt.with_timezone(&Utc))
546}
547
548fn message_events(
549    session_id: &str,
550    uuid: &str,
551    timestamp: DateTime<Utc>,
552    record: &Value,
553    message_value: &Value,
554    tool_call_names: &HashMap<String, Extracted<String>>,
555    line: usize,
556) -> Result<Vec<IngestEvent>, String> {
557    let role = message_value
558        .get("role")
559        .and_then(Value::as_str)
560        .ok_or_else(|| "message missing role".to_owned())?;
561    let content = message_value.get("content").unwrap_or(&Value::Null);
562    let mut parts = Vec::new();
563    let message = match (role, content) {
564        ("user", Value::String(_)) => {
565            parts.push(text_part(
566                session_id,
567                uuid,
568                0,
569                extract_self_str(content),
570                Provenance::Conversational,
571            ));
572            Message::User {
573                id: uuid.to_owned(),
574                session_id: session_id.to_owned(),
575                timestamp,
576                options: row_options(record, line),
577            }
578        }
579        ("user", Value::Array(items)) if !items.is_empty() && items.iter().all(is_tool_result) => {
580            let source_tool_result = record.get("tool_use_result").cloned();
581            parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
582                tool_result_part(
583                    session_id,
584                    uuid,
585                    ordinal,
586                    item,
587                    source_tool_result.as_ref(),
588                    tool_call_names,
589                )
590            }));
591            Message::Tool {
592                id: uuid.to_owned(),
593                session_id: session_id.to_owned(),
594                timestamp,
595                options: row_options(record, line),
596            }
597        }
598        ("user", Value::Array(items)) => {
599            parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
600                user_part(session_id, uuid, ordinal, item, tool_call_names)
601            }));
602            Message::User {
603                id: uuid.to_owned(),
604                session_id: session_id.to_owned(),
605                timestamp,
606                options: row_options(record, line),
607            }
608        }
609        ("assistant", Value::Array(items)) => {
610            parts.extend(
611                items
612                    .iter()
613                    .enumerate()
614                    .map(|(ordinal, item)| assistant_part(session_id, uuid, ordinal, item)),
615            );
616            Message::Assistant {
617                id: uuid.to_owned(),
618                session_id: session_id.to_owned(),
619                timestamp,
620                options: assistant_options(record, message_value, line),
621            }
622        }
623        (other, _) => {
624            return Err(format!("unsupported message role {other}"));
625        }
626    };
627
628    let mut events = Vec::with_capacity(parts.len() + 1);
629    events.push(IngestEvent::Message(message));
630    events.extend(parts.into_iter().map(IngestEvent::Part));
631    Ok(events)
632}
633
634fn text_part(
635    session_id: &str,
636    message_id: &str,
637    ordinal: usize,
638    text: Option<Extracted<String>>,
639    provenance: Provenance,
640) -> Part {
641    Part {
642        session_id: session_id.to_owned(),
643        id: part_id(message_id, ordinal),
644        message_id: message_id.to_owned(),
645        ordinal: part_ordinal(ordinal),
646        provenance,
647        options: empty_options(),
648        kind: PartKind::Text { text },
649    }
650}
651
652fn user_part(
653    session_id: &str,
654    message_id: &str,
655    ordinal: usize,
656    value: &Value,
657    tool_call_names: &HashMap<String, Extracted<String>>,
658) -> Part {
659    match value.get("type").and_then(Value::as_str) {
660        Some("text") => text_part(
661            session_id,
662            message_id,
663            ordinal,
664            extract_str(value, "text"),
665            Provenance::Conversational,
666        ),
667        Some("image") | Some("file") => file_part(
668            session_id,
669            message_id,
670            ordinal,
671            value,
672            Provenance::Conversational,
673        ),
674        Some("tool_result") => tool_result_part(
675            session_id,
676            message_id,
677            ordinal,
678            value,
679            None,
680            tool_call_names,
681        ),
682        // Unknown user block: preserve the raw JSON in a Text slot rather than
683        // drop it - a lossless encoding, not a synthesized value.
684        _ => text_part(
685            session_id,
686            message_id,
687            ordinal,
688            Some(extract_compact_repr(value)),
689            Provenance::Conversational,
690        ),
691    }
692}
693
694fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, value: &Value) -> Part {
695    match value.get("type").and_then(Value::as_str) {
696        Some("text") => text_part(
697            session_id,
698            message_id,
699            ordinal,
700            extract_str(value, "text"),
701            Provenance::Conversational,
702        ),
703        Some("thinking") => Part {
704            session_id: session_id.to_owned(),
705            id: part_id(message_id, ordinal),
706            message_id: message_id.to_owned(),
707            ordinal: part_ordinal(ordinal),
708            provenance: Provenance::Conversational,
709            options: signature_options(value),
710            kind: PartKind::Reasoning {
711                text: extract_str(value, "thinking"),
712            },
713        },
714        Some(kind @ ("tool_use" | "server_tool_use")) => Part {
715            session_id: session_id.to_owned(),
716            id: part_id(message_id, ordinal),
717            message_id: message_id.to_owned(),
718            ordinal: part_ordinal(ordinal),
719            provenance: Provenance::Conversational,
720            options: empty_options(),
721            kind: PartKind::ToolCall {
722                call_id: extract_str(value, "id"),
723                name: extract_str(value, "name"),
724                params: value.get("input").cloned().unwrap_or(Value::Null),
725                provider_executed: kind == "server_tool_use",
726            },
727        },
728        Some("image") | Some("file") => file_part(
729            session_id,
730            message_id,
731            ordinal,
732            value,
733            Provenance::Conversational,
734        ),
735        _ => text_part(
736            session_id,
737            message_id,
738            ordinal,
739            Some(extract_compact_repr(value)),
740            Provenance::Conversational,
741        ),
742    }
743}
744
745fn tool_result_part(
746    session_id: &str,
747    message_id: &str,
748    ordinal: usize,
749    value: &Value,
750    source_tool_result: Option<&Value>,
751    tool_call_names: &HashMap<String, Extracted<String>>,
752) -> Part {
753    let call_id = extract_str(value, "tool_use_id");
754    // The name lives on the prior `tool_use`, resolved via the per-session map;
755    // a miss surfaces as `None`, never a sentinel (spec.md#model-no-synthesis).
756    let name = value
757        .str_field("tool_use_id")
758        .and_then(|id| tool_call_names.get(id))
759        .cloned();
760    let result = value
761        .get("content")
762        .cloned()
763        .or_else(|| source_tool_result.cloned())
764        .unwrap_or(Value::Null);
765    Part {
766        session_id: session_id.to_owned(),
767        id: part_id(message_id, ordinal),
768        message_id: message_id.to_owned(),
769        ordinal: part_ordinal(ordinal),
770        // spec.md#model-part-provenance: tool output is runtime-produced.
771        provenance: Provenance::Injected,
772        options: empty_options(),
773        kind: PartKind::ToolResult {
774            call_id,
775            name,
776            is_failure: value
777                .get("is_error")
778                .and_then(Value::as_bool)
779                .unwrap_or(false),
780            result,
781        },
782    }
783}
784
785fn file_part(
786    session_id: &str,
787    message_id: &str,
788    ordinal: usize,
789    value: &Value,
790    provenance: Provenance,
791) -> Part {
792    let media_type = value
793        .get("media_type")
794        .or_else(|| value.get("mime_type"))
795        .and_then(Value::as_str)
796        .map(ToOwned::to_owned);
797    let file_name = value
798        .get("file_name")
799        .or_else(|| value.get("name"))
800        .and_then(Value::as_str)
801        .map(ToOwned::to_owned);
802    let data = if let Some(source) = value.get("source") {
803        if let Some(url) = source.get("url").and_then(Value::as_str) {
804            FileData::Url(url.to_owned())
805        } else if let Some(bytes) = source.get("data").and_then(Value::as_str) {
806            FileData::String(bytes.to_owned())
807        } else {
808            FileData::String(compact_json(source))
809        }
810    } else if let Some(url) = value.get("url").and_then(Value::as_str) {
811        FileData::Url(url.to_owned())
812    } else {
813        FileData::String(compact_json(value))
814    };
815    Part {
816        session_id: session_id.to_owned(),
817        id: part_id(message_id, ordinal),
818        message_id: message_id.to_owned(),
819        ordinal: part_ordinal(ordinal),
820        provenance,
821        options: empty_options(),
822        kind: PartKind::File {
823            media_type,
824            file_name,
825            data,
826        },
827    }
828}
829
830fn row_options(record: &Value, line: usize) -> ProviderOptions {
831    let mut options = source_options(NAME, record);
832    if let Some(source) = options.get_mut("source").and_then(Value::as_object_mut) {
833        source.insert("line".to_owned(), json!(line));
834        source.insert("raw_type".to_owned(), json!(record.get("type")));
835        if let Some(subtype) = record.get("subtype") {
836            source.insert("subtype".to_owned(), subtype.clone());
837        }
838    }
839    options
840}
841
842fn assistant_options(record: &Value, message_value: &Value, line: usize) -> ProviderOptions {
843    let mut options = row_options(record, line);
844    let anthropic = json!({
845        "id": message_value.get("id"),
846        "model": message_value.get("model"),
847        "stop_reason": message_value.get("stop_reason"),
848        "usage": message_value.get("usage"),
849    });
850    options.insert("anthropic".to_owned(), anthropic);
851    options
852}
853
854fn signature_options(value: &Value) -> ProviderOptions {
855    let mut options = ProviderOptions::new();
856    if let Some(signature) = value.get("signature").and_then(Value::as_str) {
857        options.insert("anthropic".to_owned(), json!({ "signature": signature }));
858    }
859    options
860}
861
862fn is_tool_result(value: &Value) -> bool {
863    value.get("type").and_then(Value::as_str) == Some("tool_result")
864}
865
866fn serialize_session(
867    session: &crate::sessions::SessionWithMessages,
868    fidelity: RestoreFidelity,
869) -> Result<Vec<RestoredFile>, AdapterError> {
870    // Native restore replays the verbatim `audit.jsonl` rows (each message's
871    // stored `raw_record`) in source-line order, plus the metadata file from the
872    // session's stored `raw_record`. spec.md#adapter-native-restore-lossless:
873    // a session ingested without raw records (foreign-sourced) can't be replayed
874    // natively, so we re-enter forcing Foreign (which stamps actual_fidelity).
875    let session_raw = raw_record(&session.session.options);
876    if fidelity == RestoreFidelity::Native && session_raw.is_none() {
877        return serialize_session(session, RestoreFidelity::Foreign);
878    }
879
880    let relative_dir = session
881        .session
882        .options
883        .get("source")
884        .and_then(|source| source.get("relative_dir"))
885        .and_then(Value::as_str)
886        .map(PathBuf::from)
887        .unwrap_or_else(|| PathBuf::from(&session.session.id));
888
889    let mut messages = session.messages.clone();
890    if fidelity == RestoreFidelity::Native {
891        messages.sort_by(|left, right| {
892            source_line(left.message.options())
893                .cmp(&source_line(right.message.options()))
894                .then_with(|| by_timestamp_then_id(left, right))
895        });
896    } else {
897        messages.sort_by(by_timestamp_then_id);
898    }
899
900    let mut records = Vec::with_capacity(messages.len());
901    for message in &messages {
902        if fidelity == RestoreFidelity::Native {
903            if let Some(raw) = raw_record(message.message.options()) {
904                records.push(raw);
905            }
906            continue;
907        }
908        if let Some(record) = foreign_record(&session.session.id, message) {
909            records.push(record);
910        }
911    }
912
913    let mut files = vec![RestoredFile::new(
914        relative_dir.join("audit.jsonl"),
915        jsonl_bytes(NAME, &records)?,
916        fidelity,
917    )];
918
919    // The metadata sits a level up, named after the session dir: `local_x.json`.
920    let meta_value = match fidelity {
921        RestoreFidelity::Native => session_raw,
922        RestoreFidelity::Foreign => Some(foreign_metadata(session)),
923    };
924    if let (Some(meta), Some(parent), Some(dir_name)) = (
925        meta_value,
926        relative_dir.parent(),
927        relative_dir.file_name().and_then(|name| name.to_str()),
928    ) {
929        files.push(RestoredFile::new(
930            parent.join(format!("{dir_name}.json")),
931            serde_json::to_vec(&meta).map_err(|error| {
932                AdapterError::schema(
933                    NAME,
934                    &session.session.id,
935                    format!("json encode failed: {error}"),
936                )
937            })?,
938            fidelity,
939        ));
940    }
941    Ok(files)
942}
943
944/// Best-effort metadata for a foreign session: the fields `build_session`
945/// reads back, derived from canonical data.
946fn foreign_metadata(session: &crate::sessions::SessionWithMessages) -> Value {
947    json!({
948        "sessionId": session.session.id,
949        "createdAt": session.session.created_at.timestamp_millis(),
950        "cwd": &*session.session.project,
951    })
952}
953
954/// Best-effort `audit.jsonl` row for a foreign session, mirroring the Anthropic
955/// `message` envelope this adapter reads.
956fn foreign_record(session_id: &str, message: &crate::sessions::MessageWithParts) -> Option<Value> {
957    let (rtype, role) = match &message.message {
958        Message::User { .. } => ("user", "user"),
959        Message::Assistant { .. } => ("assistant", "assistant"),
960        Message::Tool { .. } => ("user", "user"),
961        // System carriers have no idiomatic audit row; content stays canonical.
962        Message::System { .. } => return None,
963    };
964    let content = Value::Array(message.parts.iter().map(audit_part).collect());
965    Some(json!({
966        "type": rtype,
967        "session_id": session_id,
968        "uuid": message.message.id(),
969        "message": { "role": role, "content": content },
970        "_audit_timestamp": message
971            .message
972            .timestamp()
973            .to_rfc3339_opts(SecondsFormat::Millis, true),
974    }))
975}
976
977fn audit_part(part: &Part) -> Value {
978    match &part.kind {
979        PartKind::Text { text } => json!({ "type": "text", "text": extracted_text(text) }),
980        PartKind::Reasoning { text } => {
981            json!({ "type": "thinking", "thinking": extracted_text(text) })
982        }
983        PartKind::ToolCall {
984            call_id,
985            name,
986            params,
987            provider_executed,
988        } => json!({
989            "type": if *provider_executed { "server_tool_use" } else { "tool_use" },
990            "id": extracted_text(call_id),
991            "name": extracted_text(name),
992            "input": params,
993        }),
994        PartKind::ToolResult {
995            call_id,
996            is_failure,
997            result,
998            ..
999        } => json!({
1000            "type": "tool_result",
1001            "tool_use_id": extracted_text(call_id),
1002            "is_error": is_failure,
1003            "content": result,
1004        }),
1005        other => json!({
1006            "type": "text",
1007            "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
1008        }),
1009    }
1010}
1011
1012/// Read the stored source line for a message (`options.source.line`), used to
1013/// replay `audit.jsonl` rows in their original order on native restore.
1014fn source_line(options: &ProviderOptions) -> Option<u64> {
1015    options
1016        .get("source")
1017        .and_then(|source| source.get("line"))
1018        .and_then(Value::as_u64)
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023    //! End-to-end tests over the committed Cowork fixture corpus
1024    //! (`tests/fixtures/adapter/claude_desktop_app/`), including the regression
1025    //! guard that the nested `.claude/` inner Claude Code loop is never ingested.
1026    #![allow(clippy::expect_used, clippy::unwrap_used)]
1027
1028    use super::*;
1029    use crate::{handlers::ingest_adapter, sessions::Store};
1030    use tempfile::TempDir;
1031
1032    // Manifest-dir anchored: unit tests must not depend on the process cwd
1033    // (figment::Jail chdirs the whole test process while config tests run).
1034    const FIXTURES: &str = concat!(
1035        env!("CARGO_MANIFEST_DIR"),
1036        "/tests/fixtures/adapter/claude_desktop_app/local-agent-mode-sessions"
1037    );
1038    /// The inner Claude Code loop transcript nested under one session's
1039    /// `.claude/projects/`; the adapter must never surface it as a session.
1040    const INNER_LOOP_ID: &str = "a9985b0b-2f5e-4125-b105-7f62376f5509";
1041
1042    #[test]
1043    fn probe_default_finds_cowork_store_under_home() -> anyhow::Result<()> {
1044        crate::adapter::test_support::assert_probe_default(
1045            &ClaudeDesktopAppFactory,
1046            &[
1047                "Library",
1048                "Application Support",
1049                "Claude",
1050                "local-agent-mode-sessions",
1051            ],
1052        )
1053    }
1054
1055    #[tokio::test(flavor = "multi_thread")]
1056    async fn ingests_cowork_fixture_into_canonical_shape() -> anyhow::Result<()> {
1057        let temp = TempDir::new()?;
1058        let store = Store::open_local(temp.path()).await?;
1059        let adapter = ClaudeDesktopAppAdapter::new(FIXTURES);
1060        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1061        assert_eq!(summary.dropped_sessions, 0, "no session-level rejections");
1062
1063        let ids = store.session_ids().await?;
1064        // Four `audit.jsonl` sessions; the nested `.claude/**/*.jsonl` inner loop
1065        // must NOT become a fifth session (spec.md#adapters double-count guard).
1066        assert_eq!(ids.len(), 4, "exactly the four audit.jsonl sessions");
1067        assert!(
1068            !ids.iter().any(|id| id.contains(INNER_LOOP_ID)),
1069            "the nested inner Claude Code loop must not be ingested as a session",
1070        );
1071        for id in &ids {
1072            assert!(
1073                id.starts_with("local_"),
1074                "session id is the metadata sessionId (local_<uuid>): {id}",
1075            );
1076        }
1077
1078        let mut saw_call = false;
1079        let mut saw_resolved_result = false;
1080        let mut saw_reasoning = false;
1081        let mut saw_system = false;
1082        for id in &ids {
1083            let session = store.get_session(id).await?.expect("session round-trips");
1084            assert_eq!(session.session.source_agent, NAME);
1085            assert!(
1086                !(*session.session.project).is_empty(),
1087                "spec.md#model-project-non-empty",
1088            );
1089            for stored in &session.messages {
1090                if matches!(stored.message, Message::System { .. }) {
1091                    saw_system = true;
1092                }
1093                for part in &stored.parts {
1094                    match &part.kind {
1095                        PartKind::ToolCall { .. } => saw_call = true,
1096                        PartKind::ToolResult { name, .. } if name.is_some() => {
1097                            saw_resolved_result = true;
1098                        }
1099                        PartKind::Reasoning { .. } => saw_reasoning = true,
1100                        _ => {}
1101                    }
1102                }
1103            }
1104        }
1105        assert!(saw_call, "assistant tool_use -> ToolCall");
1106        assert!(
1107            saw_resolved_result,
1108            "tool_result name resolved via the per-session tool_use map",
1109        );
1110        assert!(saw_reasoning, "assistant thinking -> Reasoning");
1111        assert!(
1112            saw_system,
1113            "system/result/... records become System carriers"
1114        );
1115        Ok(())
1116    }
1117
1118    #[tokio::test(flavor = "multi_thread")]
1119    async fn native_restore_round_trips() -> anyhow::Result<()> {
1120        let temp = TempDir::new()?;
1121        let store = Store::open_local(temp.path().join("store")).await?;
1122        let adapter = ClaudeDesktopAppAdapter::new(FIXTURES);
1123        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1124        let original = store.session_ids().await?;
1125
1126        // Native restore replays each session's audit.jsonl + metadata; collect
1127        // all files (distinct dirs, no collision) then write the tree once.
1128        let mut files = Vec::new();
1129        for id in &original {
1130            let session = store.get_session(id).await?.expect("round-trips");
1131            files.extend(ClaudeDesktopAppFactory.serialize(&session, RestoreFidelity::Native)?);
1132        }
1133        let restore_root = temp.path().join("restore");
1134        crate::adapter::write_restored_files(&restore_root, files)?;
1135
1136        let restore_store = Store::open_local(temp.path().join("restore-store")).await?;
1137        let restored = ClaudeDesktopAppAdapter::new(&restore_root);
1138        ingest_adapter(
1139            &restore_store,
1140            &restored,
1141            &crate::adapter::NoopOracle,
1142            |_| {},
1143        )
1144        .await?;
1145        assert_eq!(
1146            restore_store.session_ids().await?.len(),
1147            original.len(),
1148            "native restore re-ingests as the same session set",
1149        );
1150        Ok(())
1151    }
1152}