Skip to main content

pond/adapter/
claude_desktop_app.rs

1//! Claude Desktop app adapter - Cowork / agent-mode sessions only.
2//!
3//! Source path:
4//! `~/Library/Application Support/Claude/local-agent-mode-sessions/<acct>/<workspace>/local_<uuid>/audit.jsonl`
5//! with a sibling `local_<uuid>.json` metadata file. Each `audit.jsonl` is one
6//! session (one JSON object per line); the metadata carries `sessionId`,
7//! `createdAt`, the project folder, and UI fields.
8//!
9//! Scope is deliberately narrow (spec.md#adapters, locked product split):
10//! - It NEVER reads `~/.claude/projects` or the `claude-code-sessions/`
11//!   wrappers - the Desktop "Code" tab writes there in CLI format and is
12//!   covered by `claude-code`.
13//! - It NEVER descends into a session's nested `.claude/` directory. That is
14//!   the inner Claude Code loop the Cowork sandbox runs; `audit.jsonl` already
15//!   represents that conversation, so ingesting the `.claude/projects/**/*.jsonl`
16//!   transcripts under it would double-count the same session. Discovery is
17//!   scoped to `local_*/audit.jsonl` and the walk prunes hidden dirs, so this
18//!   adapter cannot pick up the inner loop. This is why it drives the [`Adapter`]
19//!   seam directly rather than through `JsonlTree` (whose blanket `*.jsonl` walk
20//!   would find the inner transcripts).
21//!
22//! The per-line `message` object is the Anthropic Messages shape (the same
23//! content blocks claude-code carries), so the part mapping mirrors that
24//! adapter. Records that are not a `user`/`assistant` turn (`system`, `result`,
25//! `rate_limit_event`, ...) become System carriers: their `subtype`/`type` is
26//! the content and the verbatim row survives in `options.source.raw_record`, so
27//! native restore reproduces the full `audit.jsonl`. `system/permission_*`
28//! records carry no tool-call id to link an approval to, so they stay System
29//! carriers rather than fabricate the `tool_call_id` a `ToolApproval*` part
30//! requires (spec.md#model-no-synthesis).
31
32use std::{
33    collections::HashMap,
34    path::{Path, PathBuf},
35};
36
37use async_stream::stream;
38use chrono::{DateTime, SecondsFormat, Utc};
39use serde_json::{Value, json};
40use tokio::sync::mpsc;
41use walkdir::WalkDir;
42
43use crate::{
44    sessions::IngestEvent,
45    wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
46};
47
48use super::{
49    Adapter, AdapterError, AdapterFactory, AdapterYield, AdapterYieldStream, DiscoverFuture, Env,
50    RestoreFidelity, RestoredFile, SkipOracle, SkipReason, by_timestamp_then_id, compact_json,
51    config_path, empty_options,
52    extract::{
53        Extracted, Source, bound_value, extract_compact_repr, extract_self_str, extract_str,
54    },
55    extracted_text,
56    jsonl::{RECORD_CAP, peek_last_line},
57    jsonl_bytes, part_id, part_ordinal, raw_record, source_options,
58};
59
60const NAME: &str = "claude-desktop-app";
61
62/// Event-channel bound; doubles as backpressure - the blocking reader parks on
63/// `blocking_send` when the consumer lags.
64const CHANNEL_CAP: usize = 256;
65
66/// The session-store subpath under `~/Library/Application Support/Claude`.
67const SESSIONS_SUBDIR: &str = "local-agent-mode-sessions";
68
69/// Stateless factory: opens [`ClaudeDesktopAppAdapter`] instances and probes for
70/// the Cowork store under `~/Library/Application Support/Claude`.
71pub struct ClaudeDesktopAppFactory;
72
73impl AdapterFactory for ClaudeDesktopAppFactory {
74    fn name(&self) -> &'static str {
75        NAME
76    }
77
78    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
79        Ok(Box::new(ClaudeDesktopAppAdapter::new(config_path(
80            NAME, config,
81        )?)))
82    }
83
84    fn probe_default(&self, env: &Env) -> Option<Value> {
85        let path = cowork_root(&env.home);
86        path.exists().then(|| json!({ "path": path }))
87    }
88
89    fn serialize(
90        &self,
91        session: &crate::sessions::SessionWithMessages,
92        fidelity: RestoreFidelity,
93    ) -> Result<Vec<RestoredFile>, AdapterError> {
94        serialize_session(session, fidelity)
95    }
96}
97
98/// `~/Library/Application Support/Claude/local-agent-mode-sessions`.
99fn cowork_root(home: &Path) -> PathBuf {
100    home.join("Library")
101        .join("Application Support")
102        .join("Claude")
103        .join(SESSIONS_SUBDIR)
104}
105
106/// Configured Cowork reader, rooted at a `local-agent-mode-sessions/` directory.
107#[derive(Debug, Clone)]
108pub struct ClaudeDesktopAppAdapter {
109    root: PathBuf,
110}
111
112impl ClaudeDesktopAppAdapter {
113    pub fn new(root: impl Into<PathBuf>) -> Self {
114        Self { root: root.into() }
115    }
116}
117
118impl Adapter for ClaudeDesktopAppAdapter {
119    fn discover(&self) -> DiscoverFuture<'_> {
120        let root = self.root.clone();
121        Box::pin(async move {
122            tokio::task::spawn_blocking(move || collect_sessions(&root).map(|files| files.len()))
123                .await
124                .map_err(join_error)?
125        })
126    }
127
128    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
129        let adapter = self.clone();
130        Box::pin(stream! {
131            let files = {
132                let root = adapter.root.clone();
133                tokio::task::spawn_blocking(move || collect_sessions(&root)).await
134            };
135            let files = match files {
136                Ok(Ok(files)) => files,
137                Ok(Err(error)) => { yield Err(error); return; }
138                Err(join) => { yield Err(join_error(join)); return; }
139            };
140
141            // Freshness pre-pass: read the audit log's last-message timestamp (its
142            // tail line) and skip when it is no newer than pond's watermark. Only
143            // when the oracle has entries - a first ingest has nothing to compare.
144            let mut survivors = Vec::with_capacity(files.len());
145            for file in files {
146                if !oracle.is_empty() {
147                    let audit = file.audit_path.clone();
148                    let last_ts =
149                        match tokio::task::spawn_blocking(move || source_last_ts(&audit)).await {
150                            Ok(last_ts) => last_ts,
151                            Err(join) => { yield Err(join_error(join)); return; }
152                        };
153                    if crate::adapter::is_session_fresh(oracle, &file.session_id, last_ts) {
154                        yield Ok(AdapterYield::Skipped {
155                            session_id: Some(file.session_id.clone()),
156                            project: None,
157                            reason: SkipReason::Fresh,
158                        });
159                        continue;
160                    }
161                }
162                survivors.push(file);
163            }
164
165            let (tx, mut rx) = mpsc::channel(CHANNEL_CAP);
166            let handle = tokio::task::spawn_blocking(move || read_sessions(survivors, &tx));
167            while let Some(item) = rx.recv().await {
168                yield item;
169            }
170            if let Err(join) = handle.await {
171                yield Err(join_error(join));
172            }
173        })
174    }
175}
176
177/// A blocking-task panic is a pond bug, not bad source data, so it fails the
178/// whole run rather than skipping a session.
179/// Latest message timestamp (micros) for the freshness gate: the audit log's
180/// last record. The audit stream is append-ordered, so its tail line is the
181/// latest message; an unreadable file or a record without a timestamp yields
182/// `None` and the session re-reads (safe). The sibling metadata file is not
183/// consulted - pond never rewrites an existing session row, so a pure-metadata
184/// change is a no-op.
185fn source_last_ts(audit_path: &Path) -> Option<i64> {
186    let last_line = peek_last_line(audit_path)?;
187    let record: Value = serde_json::from_str(&last_line).ok()?;
188    Some(record_timestamp(&record)?.timestamp_micros())
189}
190
191fn join_error(join: tokio::task::JoinError) -> AdapterError {
192    AdapterError::io(
193        NAME,
194        "blocking read task",
195        std::io::Error::other(join.to_string()),
196    )
197}
198
199/// One Cowork session located on disk: its `audit.jsonl`, its sibling metadata
200/// file, the session id (the `local_<uuid>` directory name, which equals
201/// `metadata.sessionId`), and the session dir relative to the root (for restore).
202struct CoworkSession {
203    session_id: String,
204    audit_path: PathBuf,
205    meta_path: PathBuf,
206    relative_dir: PathBuf,
207}
208
209/// Walk the root for `local_*/audit.jsonl`, pruning hidden directories so the
210/// nested `.claude/` inner loop is never reached. Sorted for deterministic
211/// ingest order. A missing root means "no sessions yet", not an error.
212fn collect_sessions(root: &Path) -> Result<Vec<CoworkSession>, AdapterError> {
213    if !root.exists() {
214        return Ok(Vec::new());
215    }
216    let io = |source| AdapterError::io(NAME, root.display().to_string(), source);
217    let mut out = Vec::new();
218    let walker = WalkDir::new(root).into_iter().filter_entry(|entry| {
219        // Prune any hidden dir (`.claude`, `.audit-key` is a file). The inner
220        // Claude Code loop lives under `.claude/`, so this is the structural
221        // guard against double-counting it (spec.md#adapters).
222        !(entry.file_type().is_dir()
223            && entry
224                .file_name()
225                .to_str()
226                .is_some_and(|name| name.starts_with('.')))
227    });
228    for entry in walker {
229        let entry = entry.map_err(|error| io(error.into()))?;
230        if entry.file_name() != "audit.jsonl" {
231            continue;
232        }
233        let audit_path = entry.into_path();
234        let Some(dir) = audit_path.parent() else {
235            continue;
236        };
237        let Some(dir_name) = dir.file_name().and_then(|name| name.to_str()) else {
238            continue;
239        };
240        // The transcript dir is `local_<uuid>`; its name equals
241        // `metadata.sessionId`. A stray `audit.jsonl` elsewhere is not a Cowork
242        // session.
243        if !dir_name.starts_with("local_") {
244            continue;
245        }
246        let Some(workspace) = dir.parent() else {
247            continue;
248        };
249        let meta_path = workspace.join(format!("{dir_name}.json"));
250        let relative_dir = dir.strip_prefix(root).unwrap_or(dir).to_path_buf();
251        out.push(CoworkSession {
252            session_id: dir_name.to_owned(),
253            audit_path,
254            meta_path,
255            relative_dir,
256        });
257    }
258    out.sort_by(|a, b| a.audit_path.cmp(&b.audit_path));
259    Ok(out)
260}
261
262fn read_sessions(
263    sessions: Vec<CoworkSession>,
264    tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
265) {
266    for session in sessions {
267        if !read_one_session(session, tx) {
268            return;
269        }
270    }
271}
272
273/// Returns `false` when the consumer dropped the receiver and the read should stop.
274fn read_one_session(
275    file: CoworkSession,
276    tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
277) -> bool {
278    macro_rules! emit {
279        ($item:expr) => {
280            if tx.blocking_send($item).is_err() {
281                return false;
282            }
283        };
284    }
285
286    let meta = match read_json(&file.meta_path) {
287        Ok(value) => value,
288        Err(error) => {
289            emit!(Err(error));
290            return true;
291        }
292    };
293    let session = match build_session(&file, &meta) {
294        Ok(session) => session,
295        Err(error) => {
296            emit!(Err(error));
297            return true;
298        }
299    };
300    let created_at = session.created_at;
301    let session_id = session.id.clone();
302    emit!(Ok(AdapterYield::Event(IngestEvent::Session(session))));
303
304    let bytes = match std::fs::read(&file.audit_path) {
305        Ok(bytes) => bytes,
306        Err(error) => {
307            emit!(Err(AdapterError::io(
308                NAME,
309                file.audit_path.display().to_string(),
310                error
311            )));
312            return true;
313        }
314    };
315    let text = match std::str::from_utf8(&bytes) {
316        Ok(text) => text,
317        Err(_) => {
318            emit!(Err(AdapterError::schema(
319                NAME,
320                file.audit_path.display().to_string(),
321                "audit.jsonl is not valid UTF-8",
322            )));
323            return true;
324        }
325    };
326
327    let mut tool_call_names: HashMap<String, Extracted<String>> = HashMap::new();
328    for (index, line) in text.lines().enumerate() {
329        let line_no = index + 1;
330        if line.trim().is_empty() {
331            continue;
332        }
333        if line.len() > RECORD_CAP {
334            emit!(Err(AdapterError::schema(
335                NAME,
336                format!("{}:{line_no}", file.audit_path.display()),
337                format!(
338                    "audit line exceeds adapter record cap: {} bytes > {RECORD_CAP}",
339                    line.len()
340                ),
341            )));
342            continue;
343        }
344        let mut record: Value = match serde_json::from_str(line) {
345            Ok(value) => value,
346            Err(error) => {
347                emit!(Err(AdapterError::parse(
348                    NAME,
349                    file.audit_path.display().to_string(),
350                    line_no,
351                    error,
352                )));
353                continue;
354            }
355        };
356        bound_value(&mut record);
357        capture_tool_call_names(&record, &mut tool_call_names);
358        match record_events(&session_id, line_no, &record, created_at, &tool_call_names) {
359            Ok(events) => {
360                for event in events {
361                    emit!(Ok(AdapterYield::Event(event)));
362                }
363            }
364            Err(message) => emit!(Err(AdapterError::schema(
365                NAME,
366                format!("{}:{line_no}", file.audit_path.display()),
367                message,
368            ))),
369        }
370    }
371    true
372}
373
374/// Read one JSON file whole, bounding every string leaf at the seam cap
375/// (spec.md#adapter-bounded-values).
376fn read_json(path: &Path) -> Result<Value, AdapterError> {
377    use std::io::Read;
378    let io = |source| AdapterError::io(NAME, path.display().to_string(), source);
379    let mut file = std::fs::File::open(path).map_err(io)?;
380    let len = file.metadata().map_err(io)?.len();
381    if len > RECORD_CAP as u64 {
382        return Err(AdapterError::schema(
383            NAME,
384            path.display().to_string(),
385            format!("json file exceeds adapter record cap: {len} bytes > {RECORD_CAP}"),
386        ));
387    }
388    let mut bytes = Vec::with_capacity(len as usize);
389    file.read_to_end(&mut bytes).map_err(io)?;
390    let mut value: Value = serde_json::from_slice(&bytes)
391        .map_err(|error| AdapterError::parse(NAME, path.display().to_string(), 1, error))?;
392    bound_value(&mut value);
393    Ok(value)
394}
395
396fn build_session(file: &CoworkSession, meta: &Value) -> Result<Session, AdapterError> {
397    let display = file.meta_path.display().to_string();
398    // The `local_<uuid>` dir name is authoritative for the id (it equals
399    // `metadata.sessionId`) AND is what the freshness oracle is keyed on in
400    // events_with, so use it directly to keep the two in lockstep - a divergence
401    // would silently disable the freshness skip. The raw metadata (with its own
402    // `sessionId`) is preserved in options.source.raw_record.
403    let session_id = file.session_id.clone();
404
405    let created_at = meta
406        .get("createdAt")
407        .and_then(Value::as_i64)
408        .and_then(DateTime::from_timestamp_millis)
409        .ok_or_else(|| {
410            AdapterError::schema(
411                NAME,
412                display.clone(),
413                "metadata missing numeric `createdAt`",
414            )
415        })?;
416
417    // spec.md#model-project-non-empty: prefer the real folder the user opened
418    // (`userSelectedFolders[0]`), else the sandbox `cwd` (always present). Both
419    // are extracted from real source data, never synthesized.
420    let project = meta
421        .get("userSelectedFolders")
422        .and_then(Value::as_array)
423        .and_then(|folders| folders.first())
424        .filter(|first| first.as_str().is_some_and(|s| !s.is_empty()))
425        .and_then(|first| extract_self_str(first))
426        .or_else(|| extract_str(meta, "cwd").filter(|cwd| !cwd.trim().is_empty()))
427        .ok_or_else(|| {
428            AdapterError::schema(
429                NAME,
430                display,
431                "metadata has neither `userSelectedFolders[0]` nor `cwd` for the project",
432            )
433        })?;
434
435    let mut options = source_options(NAME, meta);
436    if let Some(source) = options.get_mut("source").and_then(Value::as_object_mut) {
437        source.insert(
438            "relative_dir".to_owned(),
439            json!(file.relative_dir.to_string_lossy()),
440        );
441        for key in [
442            "model",
443            "title",
444            "cliSessionId",
445            "systemPrompt",
446            "initialMessage",
447            "enabledMcpTools",
448            "vmProcessName",
449            "accountName",
450        ] {
451            if let Some(value) = meta.get(key) {
452                source.insert(key.to_owned(), value.clone());
453            }
454        }
455    }
456
457    Ok(Session {
458        id: session_id,
459        parent_session_id: None,
460        parent_message_id: None,
461        source_agent: NAME.to_owned(),
462        created_at,
463        project,
464        options,
465    })
466}
467
468/// Stash every `tool_use` block's `id -> name` from an assistant record's
469/// `message.content[]`, so a later `tool_result` row can resolve its name.
470/// Idempotent and safe on any record (non-assistant rows contribute nothing).
471fn capture_tool_call_names(record: &Value, map: &mut HashMap<String, Extracted<String>>) {
472    let Some(items) = record
473        .get("message")
474        .and_then(|message| message.get("content"))
475        .and_then(Value::as_array)
476    else {
477        return;
478    };
479    for item in items {
480        if !matches!(
481            item.get("type").and_then(Value::as_str),
482            Some("tool_use") | Some("server_tool_use")
483        ) {
484            continue;
485        }
486        let (Some(id), Some(name)) = (item.str_field("id"), extract_str(item, "name")) else {
487            continue;
488        };
489        map.insert(id.to_owned(), name);
490    }
491}
492
493/// Map one audit record into canonical events. `user`/`assistant` records carry
494/// an inner Anthropic `message`; everything else becomes a System carrier whose
495/// verbatim row survives in `options.source.raw_record` for lossless restore.
496fn record_events(
497    session_id: &str,
498    line: usize,
499    record: &Value,
500    default_timestamp: DateTime<Utc>,
501    tool_call_names: &HashMap<String, Extracted<String>>,
502) -> Result<Vec<IngestEvent>, String> {
503    let timestamp = record_timestamp(record).unwrap_or(default_timestamp);
504    let uuid = record
505        .get("uuid")
506        .and_then(Value::as_str)
507        .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
508    let rtype = record.get("type").and_then(Value::as_str);
509
510    match rtype {
511        Some("user") | Some("assistant") => {
512            let message_value = record.get("message").unwrap_or(&Value::Null);
513            message_events(
514                session_id,
515                &uuid,
516                timestamp,
517                record,
518                message_value,
519                tool_call_names,
520                line,
521            )
522        }
523        // `system` (init/status/api_retry/permission_*), `result`,
524        // `rate_limit_event`, `tool_use_summary`, and any future record type
525        // are kept as System carriers (spec.md#adapter-integrity-no-silent-drops).
526        _ => {
527            let content = extract_str(record, "subtype").or_else(|| extract_str(record, "type"));
528            Ok(vec![IngestEvent::Message(Message::System {
529                id: uuid,
530                session_id: session_id.to_owned(),
531                timestamp,
532                content,
533                options: row_options(record, line),
534            })])
535        }
536    }
537}
538
539fn record_timestamp(record: &Value) -> Option<DateTime<Utc>> {
540    record
541        .get("_audit_timestamp")
542        .or_else(|| record.get("timestamp"))
543        .and_then(Value::as_str)
544        .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
545        .map(|dt| dt.with_timezone(&Utc))
546}
547
548fn message_events(
549    session_id: &str,
550    uuid: &str,
551    timestamp: DateTime<Utc>,
552    record: &Value,
553    message_value: &Value,
554    tool_call_names: &HashMap<String, Extracted<String>>,
555    line: usize,
556) -> Result<Vec<IngestEvent>, String> {
557    let role = message_value
558        .get("role")
559        .and_then(Value::as_str)
560        .ok_or_else(|| "message missing role".to_owned())?;
561    let content = message_value.get("content").unwrap_or(&Value::Null);
562    let mut parts = Vec::new();
563    let message = match (role, content) {
564        ("user", Value::String(_)) => {
565            parts.push(text_part(
566                session_id,
567                uuid,
568                0,
569                extract_self_str(content),
570                Provenance::Conversational,
571            ));
572            Message::User {
573                id: uuid.to_owned(),
574                session_id: session_id.to_owned(),
575                timestamp,
576                options: row_options(record, line),
577            }
578        }
579        ("user", Value::Array(items)) if !items.is_empty() && items.iter().all(is_tool_result) => {
580            let source_tool_result = record.get("tool_use_result").cloned();
581            parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
582                tool_result_part(
583                    session_id,
584                    uuid,
585                    ordinal,
586                    item,
587                    source_tool_result.as_ref(),
588                    tool_call_names,
589                )
590            }));
591            Message::Tool {
592                id: uuid.to_owned(),
593                session_id: session_id.to_owned(),
594                timestamp,
595                options: row_options(record, line),
596            }
597        }
598        ("user", Value::Array(items)) => {
599            parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
600                user_part(session_id, uuid, ordinal, item, tool_call_names)
601            }));
602            Message::User {
603                id: uuid.to_owned(),
604                session_id: session_id.to_owned(),
605                timestamp,
606                options: row_options(record, line),
607            }
608        }
609        ("assistant", Value::Array(items)) => {
610            parts.extend(
611                items
612                    .iter()
613                    .enumerate()
614                    .map(|(ordinal, item)| assistant_part(session_id, uuid, ordinal, item)),
615            );
616            Message::Assistant {
617                id: uuid.to_owned(),
618                session_id: session_id.to_owned(),
619                timestamp,
620                options: assistant_options(record, message_value, line),
621            }
622        }
623        _ => {
624            return Ok(vec![message_carrier_event(
625                session_id, uuid, timestamp, record, line, role,
626            )]);
627        }
628    };
629
630    let mut events = Vec::with_capacity(parts.len() + 1);
631    events.push(IngestEvent::Message(message));
632    events.extend(parts.into_iter().map(IngestEvent::Part));
633    Ok(events)
634}
635
636fn message_carrier_event(
637    session_id: &str,
638    uuid: &str,
639    timestamp: DateTime<Utc>,
640    record: &Value,
641    line: usize,
642    role: &str,
643) -> IngestEvent {
644    IngestEvent::Message(Message::System {
645        id: uuid.to_owned(),
646        session_id: session_id.to_owned(),
647        timestamp,
648        content: extract_self_str(&Value::String(role.to_owned())),
649        options: row_options(record, line),
650    })
651}
652
653fn text_part(
654    session_id: &str,
655    message_id: &str,
656    ordinal: usize,
657    text: Option<Extracted<String>>,
658    provenance: Provenance,
659) -> Part {
660    Part {
661        session_id: session_id.to_owned(),
662        id: part_id(message_id, ordinal),
663        message_id: message_id.to_owned(),
664        ordinal: part_ordinal(ordinal),
665        provenance,
666        options: empty_options(),
667        kind: PartKind::Text { text },
668    }
669}
670
671fn user_part(
672    session_id: &str,
673    message_id: &str,
674    ordinal: usize,
675    value: &Value,
676    tool_call_names: &HashMap<String, Extracted<String>>,
677) -> Part {
678    match value.get("type").and_then(Value::as_str) {
679        Some("text") => text_part(
680            session_id,
681            message_id,
682            ordinal,
683            extract_str(value, "text"),
684            Provenance::Conversational,
685        ),
686        Some("image") | Some("file") => file_part(
687            session_id,
688            message_id,
689            ordinal,
690            value,
691            Provenance::Conversational,
692        ),
693        Some("tool_result") => tool_result_part(
694            session_id,
695            message_id,
696            ordinal,
697            value,
698            None,
699            tool_call_names,
700        ),
701        // Unknown user block: preserve the raw JSON in a Text slot rather than
702        // drop it - a lossless encoding, not a synthesized value.
703        _ => text_part(
704            session_id,
705            message_id,
706            ordinal,
707            Some(extract_compact_repr(value)),
708            Provenance::Conversational,
709        ),
710    }
711}
712
713fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, value: &Value) -> Part {
714    match value.get("type").and_then(Value::as_str) {
715        Some("text") => text_part(
716            session_id,
717            message_id,
718            ordinal,
719            extract_str(value, "text"),
720            Provenance::Conversational,
721        ),
722        Some("thinking") => Part {
723            session_id: session_id.to_owned(),
724            id: part_id(message_id, ordinal),
725            message_id: message_id.to_owned(),
726            ordinal: part_ordinal(ordinal),
727            provenance: Provenance::Conversational,
728            options: signature_options(value),
729            kind: PartKind::Reasoning {
730                text: extract_str(value, "thinking"),
731            },
732        },
733        Some(kind @ ("tool_use" | "server_tool_use")) => Part {
734            session_id: session_id.to_owned(),
735            id: part_id(message_id, ordinal),
736            message_id: message_id.to_owned(),
737            ordinal: part_ordinal(ordinal),
738            provenance: Provenance::Conversational,
739            options: empty_options(),
740            kind: PartKind::ToolCall {
741                call_id: extract_str(value, "id"),
742                name: extract_str(value, "name"),
743                params: value.get("input").cloned().unwrap_or(Value::Null),
744                provider_executed: kind == "server_tool_use",
745            },
746        },
747        Some("image") | Some("file") => file_part(
748            session_id,
749            message_id,
750            ordinal,
751            value,
752            Provenance::Conversational,
753        ),
754        _ => text_part(
755            session_id,
756            message_id,
757            ordinal,
758            Some(extract_compact_repr(value)),
759            Provenance::Conversational,
760        ),
761    }
762}
763
764fn tool_result_part(
765    session_id: &str,
766    message_id: &str,
767    ordinal: usize,
768    value: &Value,
769    source_tool_result: Option<&Value>,
770    tool_call_names: &HashMap<String, Extracted<String>>,
771) -> Part {
772    let call_id = extract_str(value, "tool_use_id");
773    // The name lives on the prior `tool_use`, resolved via the per-session map;
774    // a miss surfaces as `None`, never a sentinel (spec.md#model-no-synthesis).
775    let name = value
776        .str_field("tool_use_id")
777        .and_then(|id| tool_call_names.get(id))
778        .cloned();
779    let result = value
780        .get("content")
781        .cloned()
782        .or_else(|| source_tool_result.cloned())
783        .unwrap_or(Value::Null);
784    Part {
785        session_id: session_id.to_owned(),
786        id: part_id(message_id, ordinal),
787        message_id: message_id.to_owned(),
788        ordinal: part_ordinal(ordinal),
789        // spec.md#model-part-provenance: tool output is runtime-produced.
790        provenance: Provenance::Injected,
791        options: empty_options(),
792        kind: PartKind::ToolResult {
793            call_id,
794            name,
795            is_failure: value
796                .get("is_error")
797                .and_then(Value::as_bool)
798                .unwrap_or(false),
799            result,
800        },
801    }
802}
803
804fn file_part(
805    session_id: &str,
806    message_id: &str,
807    ordinal: usize,
808    value: &Value,
809    provenance: Provenance,
810) -> Part {
811    let media_type = value
812        .get("media_type")
813        .or_else(|| value.get("mime_type"))
814        .and_then(Value::as_str)
815        .map(ToOwned::to_owned);
816    let file_name = value
817        .get("file_name")
818        .or_else(|| value.get("name"))
819        .and_then(Value::as_str)
820        .map(ToOwned::to_owned);
821    let data = if let Some(source) = value.get("source") {
822        if let Some(url) = source.get("url").and_then(Value::as_str) {
823            FileData::Url(url.to_owned())
824        } else if let Some(bytes) = source.get("data").and_then(Value::as_str) {
825            FileData::String(bytes.to_owned())
826        } else {
827            FileData::String(compact_json(source))
828        }
829    } else if let Some(url) = value.get("url").and_then(Value::as_str) {
830        FileData::Url(url.to_owned())
831    } else {
832        FileData::String(compact_json(value))
833    };
834    Part {
835        session_id: session_id.to_owned(),
836        id: part_id(message_id, ordinal),
837        message_id: message_id.to_owned(),
838        ordinal: part_ordinal(ordinal),
839        provenance,
840        options: empty_options(),
841        kind: PartKind::File {
842            media_type,
843            file_name,
844            data,
845        },
846    }
847}
848
849fn row_options(record: &Value, line: usize) -> ProviderOptions {
850    let mut options = source_options(NAME, record);
851    if let Some(source) = options.get_mut("source").and_then(Value::as_object_mut) {
852        source.insert("line".to_owned(), json!(line));
853        source.insert("raw_type".to_owned(), json!(record.get("type")));
854        if let Some(subtype) = record.get("subtype") {
855            source.insert("subtype".to_owned(), subtype.clone());
856        }
857    }
858    options
859}
860
861fn assistant_options(record: &Value, message_value: &Value, line: usize) -> ProviderOptions {
862    let mut options = row_options(record, line);
863    let anthropic = json!({
864        "id": message_value.get("id"),
865        "model": message_value.get("model"),
866        "stop_reason": message_value.get("stop_reason"),
867        "usage": message_value.get("usage"),
868    });
869    options.insert("anthropic".to_owned(), anthropic);
870    options
871}
872
873fn signature_options(value: &Value) -> ProviderOptions {
874    let mut options = ProviderOptions::new();
875    if let Some(signature) = value.get("signature").and_then(Value::as_str) {
876        options.insert("anthropic".to_owned(), json!({ "signature": signature }));
877    }
878    options
879}
880
881fn is_tool_result(value: &Value) -> bool {
882    value.get("type").and_then(Value::as_str) == Some("tool_result")
883}
884
885fn serialize_session(
886    session: &crate::sessions::SessionWithMessages,
887    fidelity: RestoreFidelity,
888) -> Result<Vec<RestoredFile>, AdapterError> {
889    // Native restore replays the verbatim `audit.jsonl` rows (each message's
890    // stored `raw_record`) in source-line order, plus the metadata file from the
891    // session's stored `raw_record`. spec.md#adapter-native-restore-lossless:
892    // a session ingested without raw records (foreign-sourced) can't be replayed
893    // natively, so we re-enter forcing Foreign (which stamps actual_fidelity).
894    let session_raw = raw_record(&session.session.options);
895    if fidelity == RestoreFidelity::Native && session_raw.is_none() {
896        return serialize_session(session, RestoreFidelity::Foreign);
897    }
898
899    let relative_dir = session
900        .session
901        .options
902        .get("source")
903        .and_then(|source| source.get("relative_dir"))
904        .and_then(Value::as_str)
905        .map(PathBuf::from)
906        .unwrap_or_else(|| PathBuf::from(&session.session.id));
907
908    let mut messages = session.messages.clone();
909    if fidelity == RestoreFidelity::Native {
910        messages.sort_by(|left, right| {
911            source_line(left.message.options())
912                .cmp(&source_line(right.message.options()))
913                .then_with(|| by_timestamp_then_id(left, right))
914        });
915    } else {
916        messages.sort_by(by_timestamp_then_id);
917    }
918
919    let mut records = Vec::with_capacity(messages.len());
920    for message in &messages {
921        if fidelity == RestoreFidelity::Native {
922            if let Some(raw) = raw_record(message.message.options()) {
923                records.push(raw);
924            }
925            continue;
926        }
927        if let Some(record) = foreign_record(&session.session.id, message) {
928            records.push(record);
929        }
930    }
931
932    let mut files = vec![RestoredFile::new(
933        relative_dir.join("audit.jsonl"),
934        jsonl_bytes(NAME, &records)?,
935        fidelity,
936    )];
937
938    // The metadata sits a level up, named after the session dir: `local_x.json`.
939    let meta_value = match fidelity {
940        RestoreFidelity::Native => session_raw,
941        RestoreFidelity::Foreign => Some(foreign_metadata(session)),
942    };
943    if let (Some(meta), Some(parent), Some(dir_name)) = (
944        meta_value,
945        relative_dir.parent(),
946        relative_dir.file_name().and_then(|name| name.to_str()),
947    ) {
948        files.push(RestoredFile::new(
949            parent.join(format!("{dir_name}.json")),
950            serde_json::to_vec(&meta).map_err(|error| {
951                AdapterError::schema(
952                    NAME,
953                    &session.session.id,
954                    format!("json encode failed: {error}"),
955                )
956            })?,
957            fidelity,
958        ));
959    }
960    Ok(files)
961}
962
963/// Best-effort metadata for a foreign session: the fields `build_session`
964/// reads back, derived from canonical data.
965fn foreign_metadata(session: &crate::sessions::SessionWithMessages) -> Value {
966    json!({
967        "sessionId": session.session.id,
968        "createdAt": session.session.created_at.timestamp_millis(),
969        "cwd": &*session.session.project,
970    })
971}
972
973/// Best-effort `audit.jsonl` row for a foreign session, mirroring the Anthropic
974/// `message` envelope this adapter reads.
975fn foreign_record(session_id: &str, message: &crate::sessions::MessageWithParts) -> Option<Value> {
976    let (rtype, role) = match &message.message {
977        Message::User { .. } => ("user", "user"),
978        Message::Assistant { .. } => ("assistant", "assistant"),
979        Message::Tool { .. } => ("user", "user"),
980        // System carriers have no idiomatic audit row; content stays canonical.
981        Message::System { .. } => return None,
982    };
983    let content = Value::Array(message.parts.iter().map(audit_part).collect());
984    Some(json!({
985        "type": rtype,
986        "session_id": session_id,
987        "uuid": message.message.id(),
988        "message": { "role": role, "content": content },
989        "_audit_timestamp": message
990            .message
991            .timestamp()
992            .to_rfc3339_opts(SecondsFormat::Millis, true),
993    }))
994}
995
996fn audit_part(part: &Part) -> Value {
997    match &part.kind {
998        PartKind::Text { text } => json!({ "type": "text", "text": extracted_text(text) }),
999        PartKind::Reasoning { text } => {
1000            json!({ "type": "thinking", "thinking": extracted_text(text) })
1001        }
1002        PartKind::ToolCall {
1003            call_id,
1004            name,
1005            params,
1006            provider_executed,
1007        } => json!({
1008            "type": if *provider_executed { "server_tool_use" } else { "tool_use" },
1009            "id": extracted_text(call_id),
1010            "name": extracted_text(name),
1011            "input": params,
1012        }),
1013        PartKind::ToolResult {
1014            call_id,
1015            is_failure,
1016            result,
1017            ..
1018        } => json!({
1019            "type": "tool_result",
1020            "tool_use_id": extracted_text(call_id),
1021            "is_error": is_failure,
1022            "content": result,
1023        }),
1024        other => json!({
1025            "type": "text",
1026            "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
1027        }),
1028    }
1029}
1030
1031/// Read the stored source line for a message (`options.source.line`), used to
1032/// replay `audit.jsonl` rows in their original order on native restore.
1033fn source_line(options: &ProviderOptions) -> Option<u64> {
1034    options
1035        .get("source")
1036        .and_then(|source| source.get("line"))
1037        .and_then(Value::as_u64)
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042    //! End-to-end tests over the committed Cowork fixture corpus
1043    //! (`tests/fixtures/adapter/claude_desktop_app/`), including the regression
1044    //! guard that the nested `.claude/` inner Claude Code loop is never ingested.
1045    #![allow(clippy::expect_used, clippy::unwrap_used)]
1046
1047    use super::*;
1048    use crate::{handlers::ingest_adapter, sessions::Store};
1049    use tempfile::TempDir;
1050
1051    // Manifest-dir anchored: unit tests must not depend on the process cwd
1052    // (figment::Jail chdirs the whole test process while config tests run).
1053    const FIXTURES: &str = concat!(
1054        env!("CARGO_MANIFEST_DIR"),
1055        "/tests/fixtures/adapter/claude_desktop_app/local-agent-mode-sessions"
1056    );
1057    /// The inner Claude Code loop transcript nested under one session's
1058    /// `.claude/projects/`; the adapter must never surface it as a session.
1059    const INNER_LOOP_ID: &str = "a9985b0b-2f5e-4125-b105-7f62376f5509";
1060
1061    #[test]
1062    fn probe_default_finds_cowork_store_under_home() -> anyhow::Result<()> {
1063        crate::adapter::test_support::assert_probe_default(
1064            &ClaudeDesktopAppFactory,
1065            &[
1066                "Library",
1067                "Application Support",
1068                "Claude",
1069                "local-agent-mode-sessions",
1070            ],
1071        )
1072    }
1073
1074    #[tokio::test(flavor = "multi_thread")]
1075    async fn ingests_cowork_fixture_into_canonical_shape() -> anyhow::Result<()> {
1076        let temp = TempDir::new()?;
1077        let store = Store::open_local(temp.path()).await?;
1078        let adapter = ClaudeDesktopAppAdapter::new(FIXTURES);
1079        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1080        assert_eq!(summary.dropped_sessions, 0, "no session-level rejections");
1081
1082        let ids = store.session_ids().await?;
1083        // Four `audit.jsonl` sessions; the nested `.claude/**/*.jsonl` inner loop
1084        // must NOT become a fifth session (spec.md#adapters double-count guard).
1085        assert_eq!(ids.len(), 4, "exactly the four audit.jsonl sessions");
1086        assert!(
1087            !ids.iter().any(|id| id.contains(INNER_LOOP_ID)),
1088            "the nested inner Claude Code loop must not be ingested as a session",
1089        );
1090        for id in &ids {
1091            assert!(
1092                id.starts_with("local_"),
1093                "session id is the metadata sessionId (local_<uuid>): {id}",
1094            );
1095        }
1096
1097        let mut saw_call = false;
1098        let mut saw_resolved_result = false;
1099        let mut saw_reasoning = false;
1100        let mut saw_system = false;
1101        for id in &ids {
1102            let session = store.get_session(id).await?.expect("session round-trips");
1103            assert_eq!(session.session.source_agent, NAME);
1104            assert!(
1105                !(*session.session.project).is_empty(),
1106                "spec.md#model-project-non-empty",
1107            );
1108            for stored in &session.messages {
1109                if matches!(stored.message, Message::System { .. }) {
1110                    saw_system = true;
1111                }
1112                for part in &stored.parts {
1113                    match &part.kind {
1114                        PartKind::ToolCall { .. } => saw_call = true,
1115                        PartKind::ToolResult { name, .. } if name.is_some() => {
1116                            saw_resolved_result = true;
1117                        }
1118                        PartKind::Reasoning { .. } => saw_reasoning = true,
1119                        _ => {}
1120                    }
1121                }
1122            }
1123        }
1124        assert!(saw_call, "assistant tool_use -> ToolCall");
1125        assert!(
1126            saw_resolved_result,
1127            "tool_result name resolved via the per-session tool_use map",
1128        );
1129        assert!(saw_reasoning, "assistant thinking -> Reasoning");
1130        assert!(
1131            saw_system,
1132            "system/result/... records become System carriers"
1133        );
1134        Ok(())
1135    }
1136
1137    #[tokio::test(flavor = "multi_thread")]
1138    async fn native_restore_round_trips() -> anyhow::Result<()> {
1139        let temp = TempDir::new()?;
1140        let store = Store::open_local(temp.path().join("store")).await?;
1141        let adapter = ClaudeDesktopAppAdapter::new(FIXTURES);
1142        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1143        let original = store.session_ids().await?;
1144
1145        // Native restore replays each session's audit.jsonl + metadata; collect
1146        // all files (distinct dirs, no collision) then write the tree once.
1147        let mut files = Vec::new();
1148        for id in &original {
1149            let session = store.get_session(id).await?.expect("round-trips");
1150            files.extend(ClaudeDesktopAppFactory.serialize(&session, RestoreFidelity::Native)?);
1151        }
1152        let restore_root = temp.path().join("restore");
1153        crate::adapter::write_restored_files(&restore_root, files)?;
1154
1155        let restore_store = Store::open_local(temp.path().join("restore-store")).await?;
1156        let restored = ClaudeDesktopAppAdapter::new(&restore_root);
1157        ingest_adapter(
1158            &restore_store,
1159            &restored,
1160            &crate::adapter::NoopOracle,
1161            |_| {},
1162        )
1163        .await?;
1164        assert_eq!(
1165            restore_store.session_ids().await?.len(),
1166            original.len(),
1167            "native restore re-ingests as the same session set",
1168        );
1169        Ok(())
1170    }
1171
1172    #[test]
1173    fn unexpected_message_content_becomes_lossless_carrier() {
1174        let names = HashMap::new();
1175        let record = json!({
1176            "type": "user",
1177            "uuid": "local-message-1",
1178            "_audit_timestamp": "2026-06-01T00:00:00Z",
1179            "message": {
1180                "role": "user",
1181                "content": null,
1182            },
1183        });
1184
1185        let events = record_events("local_session", 7, &record, Utc::now(), &names)
1186            .expect("carrier is valid");
1187        assert_eq!(events.len(), 1);
1188        assert!(matches!(
1189            &events[0],
1190            IngestEvent::Message(Message::System { id, content, .. })
1191                if id == "local-message-1" && content.as_deref().map(String::as_str) == Some("user")
1192        ));
1193    }
1194}