Skip to main content

pond/adapter/
opencode.rs

1//! opencode adapter (github.com/sst/opencode).
2//!
3//! Unlike claude-code and codex-cli, opencode does not store one JSONL file per
4//! session. It uses a content-addressed split layout under a `storage/` root:
5//!
6//! - `session/<projectID>/<sessionID>.json` - session metadata
7//! - `message/<sessionID>/<messageID>.json` - one message header (no content)
8//! - `part/<messageID>/<partID>.json`        - one content part
9//!
10//! So this is the first adapter to drive the [`Adapter`] seam directly rather
11//! than through the JSONL helper: it walks the three levels, sorts by id (the
12//! ids are lexically time-sortable, so filename order is creation order), and
13//! emits `Session -> Message -> Parts` per session.
14//!
15//! opencode fuses a tool call and its result into one `tool` part on the
16//! assistant message. Canonical keeps the two apart (a `tool_result` on an
17//! assistant message is a category error, spec.md#model-part-provenance), so the
18//! adapter splits it: a `ToolCall` Part stays on the assistant message and a
19//! synthetic `Tool` message carries the `ToolResult`. Native restore replays
20//! each real part's stored `raw_record` at its original path and skips the
21//! synthetic records, so the split is value-complete-lossless.
22
23use std::path::{Path, PathBuf};
24
25use async_stream::stream;
26use chrono::{DateTime, Utc};
27use serde_json::{Value, json};
28use tokio::sync::mpsc;
29
30use crate::{
31    sessions::IngestEvent,
32    wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
33};
34
35use super::{
36    Adapter, AdapterError, AdapterFactory, AdapterYield, AdapterYieldStream, DiscoverFuture, Env,
37    RestoreFidelity, RestoredFile, SkipOracle, SkipReason, compact_json, config_path,
38    extract::{bound_value, extract_str},
39    jsonl::RECORD_CAP,
40    part_id, part_ordinal, raw_record, source_options, validate_path_id,
41};
42
43const NAME: &str = "opencode";
44
45/// Event-channel bound; doubles as backpressure - the blocking reader parks on
46/// `blocking_send` when the consumer lags.
47const CHANNEL_CAP: usize = 256;
48
49/// Stateless factory: opens [`OpencodeAdapter`] instances and probes for the
50/// canonical install location under `~/.local/share/opencode/storage`.
51pub struct OpencodeFactory;
52
53impl AdapterFactory for OpencodeFactory {
54    fn name(&self) -> &'static str {
55        NAME
56    }
57
58    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
59        Ok(Box::new(OpencodeAdapter::new(config_path(NAME, config)?)))
60    }
61
62    fn probe_default(&self, env: &Env) -> Option<Value> {
63        let path = env
64            .home
65            .join(".local")
66            .join("share")
67            .join("opencode")
68            .join("storage");
69        path.exists().then(|| json!({ "path": path }))
70    }
71
72    fn serialize(
73        &self,
74        session: &crate::sessions::SessionWithMessages,
75        fidelity: RestoreFidelity,
76    ) -> Result<Vec<RestoredFile>, AdapterError> {
77        match fidelity {
78            RestoreFidelity::Native => serialize_native(session),
79            RestoreFidelity::Foreign => serialize_foreign(session),
80        }
81    }
82}
83
84/// Configured opencode reader, rooted at a `storage/` directory.
85#[derive(Debug, Clone)]
86pub struct OpencodeAdapter {
87    root: PathBuf,
88}
89
90impl OpencodeAdapter {
91    pub fn new(root: impl Into<PathBuf>) -> Self {
92        Self { root: root.into() }
93    }
94}
95
96impl Adapter for OpencodeAdapter {
97    fn discover(&self) -> DiscoverFuture<'_> {
98        let root = self.root.clone();
99        Box::pin(async move {
100            tokio::task::spawn_blocking(move || {
101                collect_session_files(&root).map(|files| files.len())
102            })
103            .await
104            .map_err(join_error)?
105        })
106    }
107
108    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
109        let adapter = self.clone();
110        Box::pin(stream! {
111            let files = {
112                let root = adapter.root.clone();
113                tokio::task::spawn_blocking(move || collect_session_files(&root)).await
114            };
115            let files = match files {
116                Ok(Ok(files)) => files,
117                Ok(Err(error)) => { yield Err(error); return; }
118                Err(join) => { yield Err(join_error(join)); return; }
119            };
120
121            // Subtree mtime walks happen ONLY when the oracle has a watermark
122            // for this session - on a first ingest (or NoopOracle) every walk
123            // is wasted work since there's nothing to compare against.
124            let mut survivors = Vec::with_capacity(files.len());
125            for mut file in files {
126                if let Some(ingested) = oracle.last_ingested_at(&file.session_id) {
127                    let walk = {
128                        let root = adapter.root.clone();
129                        let session_path = file.path.clone();
130                        let session_id = file.session_id.clone();
131                        tokio::task::spawn_blocking(move || {
132                            walk_session_subtree(&root, &session_path, &session_id)
133                        })
134                        .await
135                    };
136                    let walk = match walk {
137                        Ok(Ok(walk)) => walk,
138                        Ok(Err(error)) => { yield Err(error); return; }
139                        Err(join) => { yield Err(join_error(join)); return; }
140                    };
141                    if let Some(mtime) = walk.newest_mtime
142                        && mtime <= ingested
143                    {
144                        yield Ok(AdapterYield::Skipped {
145                            session_id: Some(file.session_id.clone()),
146                            project: None,
147                            reason: SkipReason::Fresh,
148                        });
149                        continue;
150                    }
151                    // Cache the walk so the read pass doesn't re-list the
152                    // same dirs (we already paid for them above).
153                    file.cached_subtree = Some(walk);
154                }
155                survivors.push(file);
156            }
157
158            let (tx, mut rx) = mpsc::channel(CHANNEL_CAP);
159            let reader = adapter.clone();
160            let handle = tokio::task::spawn_blocking(move || read_sessions(&reader, survivors, &tx));
161            while let Some(item) = rx.recv().await {
162                yield item;
163            }
164            if let Err(join) = handle.await {
165                yield Err(join_error(join));
166            }
167        })
168    }
169}
170
171/// A blocking-task panic is a pond bug, not bad source data, so it fails the
172/// whole run rather than skipping a session.
173fn join_error(join: tokio::task::JoinError) -> AdapterError {
174    AdapterError::io(
175        NAME,
176        "blocking read task",
177        std::io::Error::other(join.to_string()),
178    )
179}
180
181/// One session file located on disk. `cached_subtree` is populated only when
182/// the freshness pre-walk happened (i.e. the oracle had a watermark for this
183/// session); the read pass reuses the listings instead of re-walking.
184struct SessionFile {
185    session_id: String,
186    path: PathBuf,
187    cached_subtree: Option<SubtreeWalk>,
188}
189
190/// Result of one subtree walk: the newest mtime across session+messages+parts
191/// (for the freshness check) and the directory listings (so the read pass
192/// doesn't redo them).
193struct SubtreeWalk {
194    newest_mtime: Option<DateTime<Utc>>,
195    message_files: Vec<PathBuf>,
196    /// One entry per message file, in the same order; each is the sorted list
197    /// of part files for that message. Empty vec = message has no parts.
198    part_files_by_message: Vec<Vec<PathBuf>>,
199}
200
201/// Walk `<root>/session/<projectID>/<sessionID>.json`, sorted for deterministic
202/// ingest order. A missing `session/` dir means "no sessions yet", not an error.
203fn collect_session_files(root: &Path) -> Result<Vec<SessionFile>, AdapterError> {
204    let session_root = root.join("session");
205    let io = |path: &Path, source| AdapterError::io(NAME, path.display().to_string(), source);
206    let entries = match std::fs::read_dir(&session_root) {
207        Ok(entries) => entries,
208        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
209        Err(error) => return Err(io(&session_root, error)),
210    };
211    let mut out = Vec::new();
212    for project in entries {
213        let project = project.map_err(|error| io(&session_root, error))?;
214        if !project
215            .file_type()
216            .map_err(|error| io(&project.path(), error))?
217            .is_dir()
218        {
219            continue;
220        }
221        let project_dir = project.path();
222        for session in std::fs::read_dir(&project_dir).map_err(|error| io(&project_dir, error))? {
223            let session = session.map_err(|error| io(&project_dir, error))?;
224            let path = session.path();
225            if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
226                continue;
227            }
228            let Some(session_id) = path
229                .file_stem()
230                .and_then(|s| s.to_str())
231                .map(ToOwned::to_owned)
232            else {
233                continue;
234            };
235            validate_path_id(
236                NAME,
237                "session file name",
238                &session_id,
239                path.display().to_string(),
240            )?;
241            out.push(SessionFile {
242                session_id,
243                path,
244                cached_subtree: None,
245            });
246        }
247    }
248    out.sort_by(|a, b| a.path.cmp(&b.path));
249    Ok(out)
250}
251
252/// Walk one session's full subtree: the session file, every message file under
253/// `message/<sid>/`, every part file under `part/<mid>/`. Returns the newest
254/// mtime seen plus the cached listings so the read pass can reuse them.
255fn walk_session_subtree(
256    root: &Path,
257    session_path: &Path,
258    session_id: &str,
259) -> Result<SubtreeWalk, AdapterError> {
260    let mut newest = json_mtime(session_path);
261    let message_dir = root.join("message").join(session_id);
262    let message_files = list_json_sorted(&message_dir)?;
263    let mut part_files_by_message = Vec::with_capacity(message_files.len());
264    for message_path in &message_files {
265        newest = newest.max(json_mtime(message_path));
266        let Some(message_id) = message_path.file_stem().and_then(|stem| stem.to_str()) else {
267            part_files_by_message.push(Vec::new());
268            continue;
269        };
270        validate_path_id(
271            NAME,
272            "message file name",
273            message_id,
274            message_path.display().to_string(),
275        )?;
276        let part_dir = root.join("part").join(message_id);
277        let parts = list_json_sorted(&part_dir)?;
278        for part_path in &parts {
279            newest = newest.max(json_mtime(part_path));
280        }
281        part_files_by_message.push(parts);
282    }
283    Ok(SubtreeWalk {
284        newest_mtime: newest,
285        message_files,
286        part_files_by_message,
287    })
288}
289
290fn json_mtime(path: &Path) -> Option<DateTime<Utc>> {
291    std::fs::metadata(path)
292        .and_then(|meta| meta.modified())
293        .ok()
294        .map(DateTime::<Utc>::from)
295}
296
297fn read_sessions(
298    adapter: &OpencodeAdapter,
299    sessions: Vec<SessionFile>,
300    tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
301) {
302    for session in sessions {
303        if !read_one_session(adapter, session, tx) {
304            return;
305        }
306    }
307}
308
309/// Returns `false` when the consumer dropped the receiver and the read should stop.
310fn read_one_session(
311    adapter: &OpencodeAdapter,
312    file: SessionFile,
313    tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
314) -> bool {
315    macro_rules! emit {
316        ($item:expr) => {
317            if tx.blocking_send($item).is_err() {
318                return false;
319            }
320        };
321    }
322
323    let session_value = match read_json(&file.path) {
324        Ok(value) => value,
325        Err(error) => {
326            emit!(Err(error));
327            return true;
328        }
329    };
330    let session = match session_from_value(&session_value, &file.path) {
331        Ok(session) => session,
332        Err(error) => {
333            emit!(Err(error));
334            return true;
335        }
336    };
337    let session_id = session.id.clone();
338    if let Err(error) = validate_path_id(
339        NAME,
340        "session id",
341        &session_id,
342        file.path.display().to_string(),
343    ) {
344        emit!(Err(error));
345        return true;
346    }
347    let session_created_at = session.created_at;
348    emit!(Ok(AdapterYield::Event(IngestEvent::Session(session))));
349
350    // Reuse the freshness pre-walk's listings when present; otherwise list now.
351    let (message_files, mut part_files_by_message) = match file.cached_subtree {
352        Some(walk) => (walk.message_files, walk.part_files_by_message),
353        None => {
354            let message_dir = adapter.root.join("message").join(&session_id);
355            let files = match list_json_sorted(&message_dir) {
356                Ok(files) => files,
357                Err(error) => {
358                    emit!(Err(error));
359                    return true;
360                }
361            };
362            (files, Vec::new())
363        }
364    };
365    let use_cache = !part_files_by_message.is_empty();
366
367    for (index, message_path) in message_files.iter().enumerate() {
368        let message_value = match read_json(message_path) {
369            Ok(value) => value,
370            Err(error) => {
371                emit!(Err(error));
372                continue;
373            }
374        };
375        let Some(message_id) = message_value.get("id").and_then(Value::as_str) else {
376            emit!(Err(AdapterError::schema(
377                NAME,
378                message_path.display().to_string(),
379                "message file missing `id`",
380            )));
381            continue;
382        };
383        if let Err(error) = validate_path_id(
384            NAME,
385            "message id",
386            message_id,
387            message_path.display().to_string(),
388        ) {
389            emit!(Err(error));
390            continue;
391        }
392        let part_files = if use_cache {
393            std::mem::take(&mut part_files_by_message[index])
394        } else {
395            let part_dir = adapter.root.join("part").join(message_id);
396            match list_json_sorted(&part_dir) {
397                Ok(files) => files,
398                Err(error) => {
399                    emit!(Err(error));
400                    continue;
401                }
402            }
403        };
404        let mut parts = Vec::with_capacity(part_files.len());
405        for part_path in part_files {
406            match read_json(&part_path) {
407                Ok(value) => parts.push(value),
408                Err(error) => emit!(Err(error)),
409            }
410        }
411        match build_message_events(&session_id, &message_value, &parts, session_created_at) {
412            Ok(events) => {
413                for event in events {
414                    emit!(Ok(AdapterYield::Event(event)));
415                }
416            }
417            Err(error) => emit!(Err(error)),
418        }
419    }
420    true
421}
422
423/// Read one JSON file, bounding every string leaf at the seam cap
424/// (spec.md#adapter-bounded-values) before it leaves this module. One open +
425/// one metadata syscall on the handle - avoids the duplicate `std::fs::metadata`
426/// + `std::fs::read` pair on the ~1k-file real corpus.
427fn read_json(path: &Path) -> Result<Value, AdapterError> {
428    use std::io::Read;
429    let io = |source| AdapterError::io(NAME, path.display().to_string(), source);
430    let mut file = std::fs::File::open(path).map_err(io)?;
431    let len = file.metadata().map_err(io)?.len();
432    if len > RECORD_CAP as u64 {
433        return Err(AdapterError::schema(
434            NAME,
435            path.display().to_string(),
436            format!("json file exceeds adapter record cap: {len} bytes > {RECORD_CAP}"),
437        ));
438    }
439    let mut bytes = Vec::with_capacity(len as usize);
440    file.read_to_end(&mut bytes).map_err(io)?;
441    let mut value: Value = serde_json::from_slice(&bytes)
442        .map_err(|error| AdapterError::parse(NAME, path.display().to_string(), 1, error))?;
443    bound_value(&mut value);
444    Ok(value)
445}
446
447/// List `*.json` files in `dir`, sorted by filename (= creation order, ids are
448/// time-sortable). A missing dir is an empty list - a message can legitimately
449/// carry no parts.
450fn list_json_sorted(dir: &Path) -> Result<Vec<PathBuf>, AdapterError> {
451    let entries = match std::fs::read_dir(dir) {
452        Ok(entries) => entries,
453        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
454        Err(error) => return Err(AdapterError::io(NAME, dir.display().to_string(), error)),
455    };
456    let mut out = Vec::new();
457    for entry in entries {
458        let entry =
459            entry.map_err(|error| AdapterError::io(NAME, dir.display().to_string(), error))?;
460        let path = entry.path();
461        if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
462            out.push(path);
463        }
464    }
465    out.sort();
466    Ok(out)
467}
468
469fn session_from_value(value: &Value, path: &Path) -> Result<Session, AdapterError> {
470    let display = path.display().to_string();
471    let id = value
472        .get("id")
473        .and_then(Value::as_str)
474        .ok_or_else(|| AdapterError::schema(NAME, display.clone(), "session missing `id`"))?
475        .to_owned();
476    let created_at = millis_at(value, &["time", "created"]).ok_or_else(|| {
477        AdapterError::schema(NAME, display.clone(), "session missing `time.created`")
478    })?;
479    // spec.md#model-project-non-empty: opencode always records `directory` (the
480    // project cwd); its absence is a malformed session, not a default.
481    let project = extract_str(value, "directory")
482        .ok_or_else(|| AdapterError::schema(NAME, display, "session missing `directory`"))?;
483
484    let options = opencode_raw(value);
485
486    Ok(Session {
487        id,
488        // opencode sub-sessions (a Task spawn) carry `parentID`; a soft
489        // reference, present only when this session was spawned from another.
490        parent_session_id: value
491            .get("parentID")
492            .and_then(Value::as_str)
493            .map(ToOwned::to_owned),
494        parent_message_id: None,
495        source_agent: NAME.to_owned(),
496        created_at,
497        project,
498        options,
499    })
500}
501
502/// Build the ordered event stream for one message: the message, its parts in
503/// order, then any synthetic `Tool` messages (one per `tool` part) each
504/// followed by its `ToolResult`.
505fn build_message_events(
506    session_id: &str,
507    message_value: &Value,
508    part_values: &[Value],
509    default_timestamp: DateTime<Utc>,
510) -> Result<Vec<IngestEvent>, AdapterError> {
511    let message_id = message_value
512        .get("id")
513        .and_then(Value::as_str)
514        .ok_or_else(|| AdapterError::schema(NAME, session_id.to_owned(), "message missing `id`"))?;
515    let role = message_value.get("role").and_then(Value::as_str);
516    let timestamp = millis_at(message_value, &["time", "created"]).unwrap_or(default_timestamp);
517
518    let options = opencode_raw(message_value);
519    let message = match role {
520        Some("user") => Message::User {
521            id: message_id.to_owned(),
522            session_id: session_id.to_owned(),
523            timestamp,
524            options,
525        },
526        Some("assistant") => Message::Assistant {
527            id: message_id.to_owned(),
528            session_id: session_id.to_owned(),
529            timestamp,
530            options,
531        },
532        // opencode v2 carries non-conversational message roles (synthetic,
533        // shell, compaction); keep them as System carriers rather than drop
534        // them (spec.md#adapter-integrity-no-silent-drops). The raw record
535        // survives in options; the role label is the content.
536        _ => Message::System {
537            id: message_id.to_owned(),
538            session_id: session_id.to_owned(),
539            timestamp,
540            content: extract_str(message_value, "role"),
541            options,
542        },
543    };
544
545    let mut events = vec![IngestEvent::Message(message)];
546    let mut deferred = Vec::new();
547    for (ordinal, part_value) in part_values.iter().enumerate() {
548        let mapped = map_part(session_id, message_id, ordinal, part_value, timestamp)?;
549        events.push(IngestEvent::Part(mapped.part));
550        if let Some(split) = mapped.tool_split {
551            deferred.push(split);
552        }
553    }
554    for ToolSplit {
555        message: tool_message,
556        result,
557    } in deferred
558    {
559        events.push(IngestEvent::Message(tool_message));
560        events.push(IngestEvent::Part(result));
561    }
562    Ok(events)
563}
564
565/// One mapped source part: the canonical Part it becomes, plus - for a fused
566/// `tool` part - the synthetic `Tool` message and `ToolResult` it splits off.
567struct MappedPart {
568    part: Part,
569    tool_split: Option<ToolSplit>,
570}
571
572struct ToolSplit {
573    message: Message,
574    result: Part,
575}
576
577fn map_part(
578    session_id: &str,
579    message_id: &str,
580    ordinal: usize,
581    value: &Value,
582    message_ts: DateTime<Utc>,
583) -> Result<MappedPart, AdapterError> {
584    let kind = value.get("type").and_then(Value::as_str);
585    let id = value
586        .get("id")
587        .and_then(Value::as_str)
588        .ok_or_else(|| AdapterError::schema(NAME, message_id.to_owned(), "part missing `id`"))?
589        .to_owned();
590
591    if kind == Some("tool") {
592        return Ok(tool_part(
593            session_id, message_id, &id, ordinal, value, message_ts,
594        ));
595    }
596
597    let (provenance, part_kind) = match kind {
598        Some("text") => (text_provenance(value), text_kind(value)),
599        Some("reasoning") => (Provenance::Conversational, reasoning_kind(value)),
600        Some("file") => (Provenance::Conversational, file_kind(value)),
601        // patch / step-start / step-finish (and any other marker) are
602        // harness-produced turn machinery, not conversation: keep them as
603        // injected Parts whose `raw_record` round-trips the source file.
604        _ => (Provenance::Injected, PartKind::Text { text: None }),
605    };
606
607    Ok(MappedPart {
608        part: Part {
609            session_id: session_id.to_owned(),
610            id,
611            message_id: message_id.to_owned(),
612            ordinal: part_ordinal(ordinal),
613            provenance,
614            options: opencode_raw(value),
615            kind: part_kind,
616        },
617        tool_split: None,
618    })
619}
620
621/// spec.md#model-part-provenance: opencode marks harness-injected text parts
622/// (the `Called the <tool> tool ...` echo, auto-expanded `@file` content) with
623/// `synthetic: true`; a genuine prompt or model reply is `synthetic: false`.
624fn text_provenance(value: &Value) -> Provenance {
625    if value.get("synthetic").and_then(Value::as_bool) == Some(true) {
626        Provenance::Injected
627    } else {
628        Provenance::Conversational
629    }
630}
631
632fn text_kind(value: &Value) -> PartKind {
633    PartKind::Text {
634        text: extract_str(value, "text"),
635    }
636}
637
638fn reasoning_kind(value: &Value) -> PartKind {
639    PartKind::Reasoning {
640        text: extract_str(value, "text"),
641    }
642}
643
644fn file_kind(value: &Value) -> PartKind {
645    // spec.md#model-no-synthesis: an absent mime hint is faithfully `None`,
646    // not a synthesized `application/octet-stream` placeholder.
647    let media_type = value
648        .get("mime")
649        .and_then(Value::as_str)
650        .map(ToOwned::to_owned);
651    let file_name = value
652        .get("filename")
653        .and_then(Value::as_str)
654        .map(ToOwned::to_owned);
655    let data = match value.get("url").and_then(Value::as_str) {
656        Some(url) => FileData::Url(url.to_owned()),
657        None => FileData::String(compact_json(value)),
658    };
659    PartKind::File {
660        media_type,
661        file_name,
662        data,
663    }
664}
665
666/// Split one opencode `tool` part (call + result fused) into a `ToolCall` on the
667/// owning assistant message and a synthetic `Tool` message carrying the
668/// `ToolResult`. The `ToolCall` keeps the source part's id and stores its full
669/// `raw_record`, so native restore reproduces the single source file; the
670/// synthetic records carry no `raw_record` and are skipped on restore.
671fn tool_part(
672    session_id: &str,
673    message_id: &str,
674    id: &str,
675    ordinal: usize,
676    value: &Value,
677    message_ts: DateTime<Utc>,
678) -> MappedPart {
679    let call_id = extract_str(value, "callID");
680    let name = extract_str(value, "tool");
681    let state = value.get("state");
682    let status = state.and_then(|s| s.get("status")).and_then(Value::as_str);
683    let result_ts = millis_at(value, &["state", "time", "end"]).unwrap_or(message_ts);
684
685    // Take input/output by moving them out of a single owned `state` clone
686    // rather than cloning each field separately - on the real corpus a fused
687    // tool part fans into three records (call + tool message + result), each
688    // of which used to clone its own slice of `state`.
689    let mut owned_state = state.cloned().unwrap_or(Value::Null);
690    let (input, result) = match owned_state.as_object_mut() {
691        Some(map) => {
692            let input = map.remove("input").unwrap_or(Value::Null);
693            let result = map
694                .remove("output")
695                .or_else(|| map.remove("error"))
696                .unwrap_or_else(|| {
697                    // No output/error - the rest of `state` IS the payload.
698                    std::mem::take(&mut owned_state)
699                });
700            (input, result)
701        }
702        None => (Value::Null, Value::Null),
703    };
704
705    let tool_call = Part {
706        session_id: session_id.to_owned(),
707        id: id.to_owned(),
708        message_id: message_id.to_owned(),
709        ordinal: part_ordinal(ordinal),
710        // spec.md#model-part-provenance: the model authored the tool call.
711        provenance: Provenance::Conversational,
712        options: opencode_raw(value),
713        kind: PartKind::ToolCall {
714            call_id: call_id.clone(),
715            name: name.clone(),
716            params: input,
717            provider_executed: false,
718        },
719    };
720
721    let tool_message_id = format!("{id}/result");
722    let tool_message = Message::Tool {
723        id: tool_message_id.clone(),
724        session_id: session_id.to_owned(),
725        timestamp: result_ts,
726        options: synthetic_options(),
727    };
728    let result_part = Part {
729        session_id: session_id.to_owned(),
730        id: part_id(&tool_message_id, 0),
731        message_id: tool_message_id,
732        ordinal: 0,
733        // spec.md#model-part-provenance: tool output is runtime-produced.
734        provenance: Provenance::Injected,
735        options: synthetic_options(),
736        kind: PartKind::ToolResult {
737            call_id,
738            name,
739            is_failure: status == Some("error"),
740            result,
741        },
742    };
743
744    MappedPart {
745        part: tool_call,
746        tool_split: Some(ToolSplit {
747            message: tool_message,
748            result: result_part,
749        }),
750    }
751}
752
753#[inline]
754fn opencode_raw(value: &Value) -> ProviderOptions {
755    source_options(NAME, value)
756}
757
758/// Marks a canonical record the adapter synthesized (the `Tool` message and
759/// `ToolResult` split off a fused `tool` part). Native restore skips records so
760/// marked - they correspond to no source file.
761fn synthetic_options() -> ProviderOptions {
762    let mut options = ProviderOptions::new();
763    options.insert("opencode".to_owned(), json!({ "synthetic": true }));
764    options
765}
766
767fn millis_at(value: &Value, path: &[&str]) -> Option<DateTime<Utc>> {
768    let mut cursor = value;
769    for key in path {
770        cursor = cursor.get(key)?;
771    }
772    DateTime::from_timestamp_millis(cursor.as_i64()?)
773}
774
775fn is_synthetic(options: &ProviderOptions) -> bool {
776    options
777        .get("opencode")
778        .and_then(|o| o.get("synthetic"))
779        .and_then(Value::as_bool)
780        == Some(true)
781}
782
783fn serialize_native(
784    session: &crate::sessions::SessionWithMessages,
785) -> Result<Vec<RestoredFile>, AdapterError> {
786    // Native replays each stored `raw_record` at its original split path; the
787    // synthetic `Tool` message and `ToolResult` (which carry no `raw_record`)
788    // are skipped, re-fusing into the single source `tool` part. Replay echoes
789    // a frozen snapshot - safe only while canonical is append-only
790    // (spec.md#adapter-integrity-additive-sync).
791    //
792    // spec.md#adapter-native-restore-lossless: when the session lacks a stored
793    // `raw_record` (older ingest, foreign-sourced session), native is
794    // impossible. We downgrade to foreign and stamp `actual_fidelity` so the
795    // caller can surface the downgrade instead of getting a silent surprise.
796    let Some(session_raw) = raw_record(&session.session.options) else {
797        return serialize_foreign(session);
798    };
799    let project_id = session_raw
800        .get("projectID")
801        .and_then(Value::as_str)
802        .ok_or_else(|| {
803            AdapterError::schema(
804                NAME,
805                session.session.id.clone(),
806                "stored session raw_record missing projectID",
807            )
808        })?;
809
810    let mut files = vec![RestoredFile::new(
811        PathBuf::from("session")
812            .join(project_id)
813            .join(format!("{}.json", session.session.id)),
814        encode(&session_raw, &session.session.id)?,
815        RestoreFidelity::Native,
816    )];
817
818    for message in &session.messages {
819        if !is_synthetic(message.message.options())
820            && let Some(raw) = raw_record(message.message.options())
821        {
822            files.push(RestoredFile::new(
823                PathBuf::from("message")
824                    .join(&session.session.id)
825                    .join(format!("{}.json", message.message.id())),
826                encode(&raw, message.message.id())?,
827                RestoreFidelity::Native,
828            ));
829        }
830        for part in &message.parts {
831            // A part that carries a `raw_record` maps 1:1 to a source file at
832            // `part/<message_id>/<part_id>.json`; synthetic split parts do not.
833            if let Some(raw) = raw_record(&part.options) {
834                files.push(RestoredFile::new(
835                    PathBuf::from("part")
836                        .join(&part.message_id)
837                        .join(format!("{}.json", part.id)),
838                    encode(&raw, &part.id)?,
839                    RestoreFidelity::Native,
840                ));
841            }
842        }
843    }
844    Ok(files)
845}
846
847fn serialize_foreign(
848    session: &crate::sessions::SessionWithMessages,
849) -> Result<Vec<RestoredFile>, AdapterError> {
850    // Foreign restore: a best-effort, idiomatic opencode tree. A non-opencode
851    // session has no `projectID` hash, so derive a stable directory key from
852    // the project path; tool results (canonical `Tool` messages) and System
853    // carriers have no idiomatic home in opencode's part model and are dropped
854    // (spec.md#adapter-native-restore-lossless, foreign clause).
855    let project_id = encode_project(&session.session.project);
856    let created = session.session.created_at.timestamp_millis();
857    let session_record = json!({
858        "id": session.session.id,
859        "projectID": project_id,
860        "directory": &*session.session.project,
861        "time": { "created": created, "updated": created },
862    });
863    let mut files = vec![RestoredFile::new(
864        PathBuf::from("session")
865            .join(&project_id)
866            .join(format!("{}.json", session.session.id)),
867        encode(&session_record, &session.session.id)?,
868        RestoreFidelity::Foreign,
869    )];
870
871    for message in &session.messages {
872        let role = match message.message {
873            Message::User { .. } => "user",
874            Message::Assistant { .. } => "assistant",
875            // No idiomatic opencode home; content stays in canonical.
876            Message::Tool { .. } | Message::System { .. } => continue,
877        };
878        let created = message.message.timestamp().timestamp_millis();
879        let record = json!({
880            "id": message.message.id(),
881            "sessionID": session.session.id,
882            "role": role,
883            "time": { "created": created },
884        });
885        files.push(RestoredFile::new(
886            PathBuf::from("message")
887                .join(&session.session.id)
888                .join(format!("{}.json", message.message.id())),
889            encode(&record, message.message.id())?,
890            RestoreFidelity::Foreign,
891        ));
892        for part in &message.parts {
893            let Some(record) = foreign_part(&session.session.id, part) else {
894                continue;
895            };
896            files.push(RestoredFile::new(
897                PathBuf::from("part")
898                    .join(message.message.id())
899                    .join(format!("{}.json", part.id)),
900                encode(&record, &part.id)?,
901                RestoreFidelity::Foreign,
902            ));
903        }
904    }
905    Ok(files)
906}
907
908fn foreign_part(session_id: &str, part: &Part) -> Option<Value> {
909    let mut record = match &part.kind {
910        PartKind::Text { text } => json!({
911            "type": "text",
912            "text": text.as_deref().map(|t| &**t),
913            "synthetic": part.provenance == Provenance::Injected,
914        }),
915        PartKind::Reasoning { text } => json!({
916            "type": "reasoning",
917            "text": text.as_deref().map(|t| &**t),
918        }),
919        PartKind::File {
920            media_type,
921            file_name,
922            data,
923        } => json!({
924            "type": "file",
925            "mime": media_type,
926            "filename": file_name,
927            "url": match data {
928                FileData::Url(url) => Some(url.clone()),
929                _ => None,
930            },
931        }),
932        PartKind::ToolCall {
933            call_id,
934            name,
935            params,
936            ..
937        } => json!({
938            "type": "tool",
939            "callID": call_id.as_deref().map(|c| &**c),
940            "tool": name.as_deref().map(|n| &**n),
941            "state": { "status": "completed", "input": params },
942        }),
943        // ToolResult / approval parts have no standalone opencode shape.
944        _ => return None,
945    };
946    if let Value::Object(map) = &mut record {
947        map.insert("id".to_owned(), json!(part.id));
948        map.insert("sessionID".to_owned(), json!(session_id));
949        map.insert("messageID".to_owned(), json!(part.message_id));
950    }
951    Some(record)
952}
953
954fn encode(value: &Value, location: &str) -> Result<Vec<u8>, AdapterError> {
955    serde_json::to_vec(value).map_err(|error| {
956        AdapterError::schema(
957            NAME,
958            location.to_owned(),
959            format!("json encode failed: {error}"),
960        )
961    })
962}
963
964fn encode_project(project: &str) -> String {
965    project
966        .chars()
967        .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
968        .collect()
969}
970
971#[cfg(test)]
972mod tests {
973    //! End-to-end test for the opencode adapter: ingest the committed
974    //! split-file fixture corpus and assert pond's canonical shape comes out
975    //! the other side, including the fused-tool-part split. The fixture lives
976    //! under `tests/fixtures/adapter/opencode/storage/`.
977    #![allow(clippy::expect_used, clippy::unwrap_used)]
978
979    use super::*;
980    use crate::{
981        adapter::extract::LEAF_CAP, handlers::ingest_adapter, sessions::Store, wire::PartKind,
982    };
983    use tempfile::TempDir;
984
985    // Manifest-dir anchored: unit tests must not depend on the process cwd
986    // (figment::Jail chdirs the whole test process while config tests run).
987    const FIXTURES: &str = concat!(
988        env!("CARGO_MANIFEST_DIR"),
989        "/tests/fixtures/adapter/opencode/storage"
990    );
991    const FRESH_SESSION_ID: &str = "ses_6405e5a5cffeIG2QHRuTmm4mA7";
992    const FRESH_MESSAGE_ID: &str = "msg_zzzzfresh0001";
993    const FRESH_PART_ID: &str = "prt_zzzzfresh0001";
994
995    struct FixedOracle {
996        session_id: &'static str,
997        ingested_at: DateTime<Utc>,
998    }
999
1000    impl crate::adapter::SkipOracle for FixedOracle {
1001        fn last_ingested_at(&self, session_id: &str) -> Option<DateTime<Utc>> {
1002            (session_id == self.session_id).then_some(self.ingested_at)
1003        }
1004    }
1005
1006    #[test]
1007    fn probe_default_finds_opencode_storage_under_home() -> anyhow::Result<()> {
1008        crate::adapter::test_support::assert_probe_default(
1009            &OpencodeFactory,
1010            &[".local", "share", "opencode", "storage"],
1011        )
1012    }
1013
1014    #[tokio::test(flavor = "multi_thread")]
1015    async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
1016        let adapter = OpencodeAdapter::new(FIXTURES);
1017        crate::adapter::test_support::assert_native_restore(
1018            &OpencodeFactory,
1019            &adapter,
1020            // opencode relative paths are rooted at the `storage/` dir.
1021            std::path::Path::new(FIXTURES),
1022        )
1023        .await
1024    }
1025
1026    #[tokio::test(flavor = "multi_thread")]
1027    async fn freshness_uses_message_and_part_file_mtimes() -> anyhow::Result<()> {
1028        let temp = TempDir::new()?;
1029        let source = temp.path().join("storage");
1030        copy_dir(std::path::Path::new(FIXTURES), &source)?;
1031
1032        let store_dir = temp.path().join("store");
1033        let store = Store::open_local(&store_dir).await?;
1034        let adapter = OpencodeAdapter::new(&source);
1035        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1036
1037        let watermark = Utc::now();
1038        std::thread::sleep(std::time::Duration::from_millis(1100));
1039        append_fresh_opencode_turn(&source)?;
1040
1041        let oracle = FixedOracle {
1042            session_id: FRESH_SESSION_ID,
1043            ingested_at: watermark,
1044        };
1045        ingest_adapter(&store, &adapter, &oracle, |_| {}).await?;
1046
1047        let session = store
1048            .get_session(FRESH_SESSION_ID)
1049            .await?
1050            .expect("fixture session round-trips");
1051        let fresh = session
1052            .messages
1053            .iter()
1054            .find(|stored| stored.message.id() == FRESH_MESSAGE_ID)
1055            .expect("message added after the session file mtime must land");
1056        assert!(
1057            fresh.parts.iter().any(|part| matches!(
1058                &part.kind,
1059                PartKind::Text { text } if text.as_deref().map(|value| value.as_str()) == Some("fresh opencode text")
1060            )),
1061            "fresh message part must land with the re-read session",
1062        );
1063        Ok(())
1064    }
1065
1066    #[tokio::test(flavor = "multi_thread")]
1067    async fn malformed_part_file_drops_only_that_part() -> anyhow::Result<()> {
1068        let temp = TempDir::new()?;
1069        let source = temp.path().join("storage");
1070        write_minimal_session(&source, "ses_badpart", "msg_badpart")?;
1071        let part_dir = source.join("part").join("msg_badpart");
1072        std::fs::write(part_dir.join("prt_000_bad.json"), b"{not json")?;
1073        write_json_file(
1074            &part_dir.join("prt_999_good.json"),
1075            &json!({
1076                "id": "prt_999_good",
1077                "sessionID": "ses_badpart",
1078                "messageID": "msg_badpart",
1079                "type": "text",
1080                "text": "valid sibling survives",
1081                "synthetic": false,
1082            }),
1083        )?;
1084
1085        let store = Store::open_local(temp.path().join("store")).await?;
1086        let summary = ingest_adapter(
1087            &store,
1088            &OpencodeAdapter::new(&source),
1089            &crate::adapter::NoopOracle,
1090            |_| {},
1091        )
1092        .await?;
1093
1094        assert_eq!(summary.dropped_events, 1);
1095        let session = store
1096            .get_session("ses_badpart")
1097            .await?
1098            .expect("session with one malformed part still lands");
1099        let message = session
1100            .messages
1101            .iter()
1102            .find(|stored| stored.message.id() == "msg_badpart")
1103            .expect("message with valid sibling part still lands");
1104        assert!(message.parts.iter().any(|part| {
1105            matches!(
1106                &part.kind,
1107                PartKind::Text { text }
1108                    if text.as_deref().map(String::as_str) == Some("valid sibling survives")
1109            )
1110        }));
1111        Ok(())
1112    }
1113
1114    #[test]
1115    fn missing_message_timestamp_uses_session_anchor() -> anyhow::Result<()> {
1116        let session_anchor =
1117            DateTime::parse_from_rfc3339("2026-05-05T12:13:14Z")?.with_timezone(&Utc);
1118        let events = build_message_events(
1119            "ses_anchor",
1120            &json!({"id": "msg_no_time", "role": "user"}),
1121            &[],
1122            session_anchor,
1123        )?;
1124
1125        let IngestEvent::Message(message) = &events[0] else {
1126            panic!("first event is the message");
1127        };
1128        assert_eq!(message.timestamp(), session_anchor);
1129        Ok(())
1130    }
1131
1132    #[test]
1133    fn source_part_without_id_is_schema_error() {
1134        let session_anchor = DateTime::from_timestamp_millis(1_765_000_000_000).unwrap();
1135        let error = build_message_events(
1136            "ses_missing_part_id",
1137            &json!({
1138                "id": "msg_missing_part_id",
1139                "role": "assistant",
1140                "time": { "created": 1_765_000_000_000i64 },
1141            }),
1142            &[json!({"type": "text", "text": "cannot restore its filename"})],
1143            session_anchor,
1144        )
1145        .expect_err("part ids are required for native filename replay");
1146
1147        assert!(error.to_string().contains("part missing `id`"));
1148    }
1149
1150    #[test]
1151    fn read_json_bounds_oversized_string_leaves() -> anyhow::Result<()> {
1152        let temp = TempDir::new()?;
1153        let path = temp.path().join("oversized.json");
1154        write_json_file(
1155            &path,
1156            &json!({
1157                "id": "oversized",
1158                "text": "x".repeat(LEAF_CAP + 100),
1159            }),
1160        )?;
1161
1162        let value = read_json(&path)?;
1163        let text = value
1164            .get("text")
1165            .and_then(Value::as_str)
1166            .expect("text leaf survives as a bounded marker");
1167        assert!(text.len() <= LEAF_CAP);
1168        assert!(text.ends_with(&format!("{} bytes>", LEAF_CAP + 100)));
1169        Ok(())
1170    }
1171
1172    #[tokio::test(flavor = "multi_thread")]
1173    async fn opencode_adapter_ingests_fixture_corpus_into_canonical_shape() -> anyhow::Result<()> {
1174        let temp = TempDir::new()?;
1175        let store = Store::open_local(temp.path()).await?;
1176        let adapter = OpencodeAdapter::new(FIXTURES);
1177
1178        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1179        assert!(summary.accepted() > 0, "ingest must accept rows");
1180        assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
1181        assert_eq!(
1182            summary.dropped_sessions, 0,
1183            "no session-level rejections expected"
1184        );
1185
1186        let (sessions, messages, parts) = store.row_counts().await?;
1187        assert!(sessions > 0, "at least one opencode session");
1188        assert!(messages > 0, "at least one opencode message");
1189        assert!(parts > 0, "at least one opencode Part");
1190
1191        let mut saw_call = false;
1192        let mut saw_result = false;
1193        let mut saw_injected_text = false;
1194        for session_id in store.session_ids().await? {
1195            let session = store
1196                .get_session(&session_id)
1197                .await?
1198                .expect("session round-trips");
1199            assert_eq!(session.session.source_agent, NAME);
1200            assert!(
1201                !(*session.session.project).is_empty(),
1202                "spec.md#model-project-non-empty",
1203            );
1204            for stored in &session.messages {
1205                for part in &stored.parts {
1206                    match &part.kind {
1207                        PartKind::ToolCall { .. } => saw_call = true,
1208                        PartKind::ToolResult { .. } => saw_result = true,
1209                        PartKind::Text { .. } if part.provenance == Provenance::Injected => {
1210                            saw_injected_text = true;
1211                        }
1212                        _ => {}
1213                    }
1214                }
1215            }
1216        }
1217        assert!(saw_call, "fused tool parts yield ToolCall on the assistant");
1218        assert!(
1219            saw_result,
1220            "fused tool parts split off a ToolResult on a Tool message",
1221        );
1222        assert!(
1223            saw_injected_text,
1224            "spec.md#model-part-provenance: synthetic text parts are injected",
1225        );
1226        Ok(())
1227    }
1228
1229    /// The synthetic `Tool` message a `tool` part splits off must carry a
1230    /// `Tool` role with one `ToolResult`, and must NOT collide with or
1231    /// overwrite the assistant message that owns the `ToolCall`.
1232    #[tokio::test(flavor = "multi_thread")]
1233    async fn fused_tool_part_splits_into_call_and_result() -> anyhow::Result<()> {
1234        let temp = TempDir::new()?;
1235        let store = Store::open_local(temp.path()).await?;
1236        let adapter = OpencodeAdapter::new(FIXTURES);
1237        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1238
1239        let mut call_ids = std::collections::HashSet::new();
1240        let mut result_ids = std::collections::HashSet::new();
1241        let mut saw_failure = false;
1242        for session_id in store.session_ids().await? {
1243            let session = store
1244                .get_session(&session_id)
1245                .await?
1246                .expect("session round-trips");
1247            for stored in &session.messages {
1248                for part in &stored.parts {
1249                    match &part.kind {
1250                        PartKind::ToolCall { call_id, .. } => {
1251                            if let Some(id) = call_id.as_deref() {
1252                                call_ids.insert(id.clone());
1253                            }
1254                        }
1255                        PartKind::ToolResult {
1256                            call_id,
1257                            is_failure,
1258                            result,
1259                            ..
1260                        } => {
1261                            assert!(
1262                                matches!(stored.message, Message::Tool { .. }),
1263                                "a ToolResult must live on a Tool-role message",
1264                            );
1265                            if *is_failure {
1266                                saw_failure = true;
1267                                assert_ne!(
1268                                    result,
1269                                    &Value::Null,
1270                                    "failed tool results must carry the source error/output payload",
1271                                );
1272                            }
1273                            if let Some(id) = call_id.as_deref() {
1274                                result_ids.insert(id.clone());
1275                            }
1276                        }
1277                        _ => {}
1278                    }
1279                }
1280            }
1281        }
1282        assert!(!call_ids.is_empty(), "corpus has tool calls");
1283        assert_eq!(
1284            call_ids, result_ids,
1285            "every tool call's id is matched by its split-off result",
1286        );
1287        assert!(
1288            saw_failure,
1289            "fixture has at least one failed opencode tool result"
1290        );
1291        Ok(())
1292    }
1293
1294    #[tokio::test(flavor = "multi_thread")]
1295    async fn foreign_serialization_reparses_as_opencode() -> anyhow::Result<()> {
1296        let temp = TempDir::new()?;
1297        let origin_store = Store::open_local(temp.path().join("origin-store")).await?;
1298        let origin = crate::adapter::PiCodingAgentAdapter::new(concat!(
1299            env!("CARGO_MANIFEST_DIR"),
1300            "/tests/fixtures/adapter/pi-coding-agent/sessions"
1301        ));
1302        ingest_adapter(&origin_store, &origin, &crate::adapter::NoopOracle, |_| {}).await?;
1303        let session_id = origin_store
1304            .session_ids()
1305            .await?
1306            .into_iter()
1307            .next()
1308            .expect("pi fixture has sessions");
1309        let session = origin_store
1310            .get_session(&session_id)
1311            .await?
1312            .expect("fixture session is readable");
1313
1314        let restored_root = temp.path().join("opencode-storage");
1315        crate::adapter::write_restored_files(
1316            &restored_root,
1317            OpencodeFactory.serialize(&session, RestoreFidelity::Foreign)?,
1318        )?;
1319        let restored_store = Store::open_local(temp.path().join("restored-store")).await?;
1320        let summary = ingest_adapter(
1321            &restored_store,
1322            &OpencodeAdapter::new(&restored_root),
1323            &crate::adapter::NoopOracle,
1324            |_| {},
1325        )
1326        .await?;
1327
1328        assert!(summary.accepted() > 0);
1329        assert_eq!(summary.dropped_events, 0);
1330        Ok(())
1331    }
1332
1333    #[test]
1334    fn path_ids_reject_separators_and_traversal() {
1335        let where_ = "session/project/session.json";
1336        assert!(validate_path_id(NAME, "session id", "ses_safe", where_).is_ok());
1337        assert!(validate_path_id(NAME, "session id", "../ses", where_).is_err());
1338        assert!(validate_path_id(NAME, "session id", "/tmp/ses", where_).is_err());
1339        assert!(validate_path_id(NAME, "message id", "msg/a", where_).is_err());
1340        assert!(validate_path_id(NAME, "message id", "msg\\a", where_).is_err());
1341    }
1342
1343    fn append_fresh_opencode_turn(root: &std::path::Path) -> anyhow::Result<()> {
1344        let message_dir = root.join("message").join(FRESH_SESSION_ID);
1345        let part_dir = root.join("part").join(FRESH_MESSAGE_ID);
1346        std::fs::create_dir_all(&message_dir)?;
1347        std::fs::create_dir_all(&part_dir)?;
1348        std::fs::write(
1349            message_dir.join(format!("{FRESH_MESSAGE_ID}.json")),
1350            serde_json::to_vec(&json!({
1351                "id": FRESH_MESSAGE_ID,
1352                "sessionID": FRESH_SESSION_ID,
1353                "role": "user",
1354                "time": { "created": 1759859999000i64 }
1355            }))?,
1356        )?;
1357        std::fs::write(
1358            part_dir.join(format!("{FRESH_PART_ID}.json")),
1359            serde_json::to_vec(&json!({
1360                "id": FRESH_PART_ID,
1361                "sessionID": FRESH_SESSION_ID,
1362                "messageID": FRESH_MESSAGE_ID,
1363                "type": "text",
1364                "text": "fresh opencode text",
1365                "synthetic": false
1366            }))?,
1367        )?;
1368        Ok(())
1369    }
1370
1371    fn write_minimal_session(
1372        root: &std::path::Path,
1373        session_id: &str,
1374        message_id: &str,
1375    ) -> anyhow::Result<()> {
1376        write_json_file(
1377            &root
1378                .join("session")
1379                .join("project")
1380                .join(format!("{session_id}.json")),
1381            &json!({
1382                "id": session_id,
1383                "projectID": "project",
1384                "directory": "/tmp/project",
1385                "time": { "created": 1_765_000_000_000i64, "updated": 1_765_000_000_000i64 },
1386            }),
1387        )?;
1388        write_json_file(
1389            &root
1390                .join("message")
1391                .join(session_id)
1392                .join(format!("{message_id}.json")),
1393            &json!({
1394                "id": message_id,
1395                "sessionID": session_id,
1396                "role": "assistant",
1397                "time": { "created": 1_765_000_000_001i64 },
1398            }),
1399        )?;
1400        std::fs::create_dir_all(root.join("part").join(message_id))?;
1401        Ok(())
1402    }
1403
1404    fn write_json_file(path: &std::path::Path, value: &Value) -> anyhow::Result<()> {
1405        if let Some(parent) = path.parent() {
1406            std::fs::create_dir_all(parent)?;
1407        }
1408        std::fs::write(path, serde_json::to_vec(value)?)?;
1409        Ok(())
1410    }
1411
1412    fn copy_dir(from: &std::path::Path, to: &std::path::Path) -> anyhow::Result<()> {
1413        std::fs::create_dir_all(to)?;
1414        for entry in std::fs::read_dir(from)? {
1415            let entry = entry?;
1416            let source = entry.path();
1417            let target = to.join(entry.file_name());
1418            if entry.file_type()?.is_dir() {
1419                copy_dir(&source, &target)?;
1420            } else {
1421                std::fs::copy(&source, &target)?;
1422            }
1423        }
1424        Ok(())
1425    }
1426}