Skip to main content

pond/adapter/
opencode.rs

1//! opencode adapter (github.com/sst/opencode).
2//!
3//! Unlike claude-code and codex-cli, opencode does not store one JSONL file per
4//! session. It uses a content-addressed split layout under a `storage/` root:
5//!
6//! - `session/<projectID>/<sessionID>.json` - session metadata
7//! - `message/<sessionID>/<messageID>.json` - one message header (no content)
8//! - `part/<messageID>/<partID>.json`        - one content part
9//!
10//! So this is the first adapter to drive the [`Adapter`] seam directly rather
11//! than through the JSONL helper: it walks the three levels, sorts by id (the
12//! ids are lexically time-sortable, so filename order is creation order), and
13//! emits `Session -> Message -> Parts` per session.
14//!
15//! opencode fuses a tool call and its result into one `tool` part on the
16//! assistant message. Canonical keeps the two apart (a `tool_result` on an
17//! assistant message is a category error, spec.md#model-part-provenance), so the
18//! adapter splits it: a `ToolCall` Part stays on the assistant message and a
19//! synthetic `Tool` message carries the `ToolResult`. Native restore replays
20//! each real part's stored `raw_record` at its original path and skips the
21//! synthetic records, so the split is value-complete-lossless.
22
23use std::path::{Path, PathBuf};
24
25use async_stream::stream;
26use chrono::{DateTime, Utc};
27use serde_json::{Value, json};
28use tokio::sync::mpsc;
29
30use crate::{
31    sessions::IngestEvent,
32    wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
33};
34
35use super::{
36    Adapter, AdapterError, AdapterFactory, AdapterYield, AdapterYieldStream, DiscoverFuture, Env,
37    RestoreFidelity, RestoredFile, SkipOracle, SkipReason, compact_json, config_path,
38    extract::{bound_value, extract_str},
39    jsonl::RECORD_CAP,
40    part_id, part_ordinal, raw_record, source_options, validate_path_id,
41};
42
43const NAME: &str = "opencode";
44
45/// Event-channel bound; doubles as backpressure - the blocking reader parks on
46/// `blocking_send` when the consumer lags.
47const CHANNEL_CAP: usize = 256;
48
49/// Stateless factory: opens [`OpencodeAdapter`] instances and probes for the
50/// canonical install location under `~/.local/share/opencode/storage`.
51pub struct OpencodeFactory;
52
53impl AdapterFactory for OpencodeFactory {
54    fn name(&self) -> &'static str {
55        NAME
56    }
57
58    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
59        Ok(Box::new(OpencodeAdapter::new(config_path(NAME, config)?)))
60    }
61
62    fn probe_default(&self, env: &Env) -> Option<Value> {
63        let path = env
64            .home
65            .join(".local")
66            .join("share")
67            .join("opencode")
68            .join("storage");
69        path.exists().then(|| json!({ "path": path }))
70    }
71
72    fn serialize(
73        &self,
74        session: &crate::sessions::SessionWithMessages,
75        fidelity: RestoreFidelity,
76    ) -> Result<Vec<RestoredFile>, AdapterError> {
77        match fidelity {
78            RestoreFidelity::Native => serialize_native(session),
79            RestoreFidelity::Foreign => serialize_foreign(session),
80        }
81    }
82}
83
84/// Configured opencode reader, rooted at a `storage/` directory.
85#[derive(Debug, Clone)]
86pub struct OpencodeAdapter {
87    root: PathBuf,
88}
89
90impl OpencodeAdapter {
91    pub fn new(root: impl Into<PathBuf>) -> Self {
92        Self { root: root.into() }
93    }
94}
95
96impl Adapter for OpencodeAdapter {
97    fn discover(&self) -> DiscoverFuture<'_> {
98        let root = self.root.clone();
99        Box::pin(async move {
100            tokio::task::spawn_blocking(move || {
101                collect_session_files(&root).map(|files| files.len())
102            })
103            .await
104            .map_err(join_error)?
105        })
106    }
107
108    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
109        let adapter = self.clone();
110        Box::pin(stream! {
111            let files = {
112                let root = adapter.root.clone();
113                tokio::task::spawn_blocking(move || collect_session_files(&root)).await
114            };
115            let files = match files {
116                Ok(Ok(files)) => files,
117                Ok(Err(error)) => { yield Err(error); return; }
118                Err(join) => { yield Err(join_error(join)); return; }
119            };
120
121            // Subtree mtime walks happen ONLY when the oracle has a watermark
122            // for this session - on a first ingest (or NoopOracle) every walk
123            // is wasted work since there's nothing to compare against.
124            let mut survivors = Vec::with_capacity(files.len());
125            for mut file in files {
126                if let Some(ingested) = oracle.last_ingested_at(&file.session_id) {
127                    let walk = {
128                        let root = adapter.root.clone();
129                        let session_path = file.path.clone();
130                        let session_id = file.session_id.clone();
131                        tokio::task::spawn_blocking(move || {
132                            walk_session_subtree(&root, &session_path, &session_id)
133                        })
134                        .await
135                    };
136                    let walk = match walk {
137                        Ok(Ok(walk)) => walk,
138                        Ok(Err(error)) => { yield Err(error); return; }
139                        Err(join) => { yield Err(join_error(join)); return; }
140                    };
141                    if let Some(mtime) = walk.newest_mtime
142                        && mtime <= ingested
143                    {
144                        yield Ok(AdapterYield::Skipped {
145                            session_id: Some(file.session_id.clone()),
146                            project: None,
147                            reason: SkipReason::Fresh,
148                        });
149                        continue;
150                    }
151                    // Cache the walk so the read pass doesn't re-list the
152                    // same dirs (we already paid for them above).
153                    file.cached_subtree = Some(walk);
154                }
155                survivors.push(file);
156            }
157
158            let (tx, mut rx) = mpsc::channel(CHANNEL_CAP);
159            let reader = adapter.clone();
160            let handle = tokio::task::spawn_blocking(move || read_sessions(&reader, survivors, &tx));
161            while let Some(item) = rx.recv().await {
162                yield item;
163            }
164            if let Err(join) = handle.await {
165                yield Err(join_error(join));
166            }
167        })
168    }
169}
170
171/// A blocking-task panic is a pond bug, not bad source data, so it fails the
172/// whole run rather than skipping a session.
173fn join_error(join: tokio::task::JoinError) -> AdapterError {
174    AdapterError::io(
175        NAME,
176        "blocking read task",
177        std::io::Error::other(join.to_string()),
178    )
179}
180
181/// One session file located on disk. `cached_subtree` is populated only when
182/// the freshness pre-walk happened (i.e. the oracle had a watermark for this
183/// session); the read pass reuses the listings instead of re-walking.
184struct SessionFile {
185    session_id: String,
186    path: PathBuf,
187    cached_subtree: Option<SubtreeWalk>,
188}
189
190/// Result of one subtree walk: the newest mtime across session+messages+parts
191/// (for the freshness check) and the directory listings (so the read pass
192/// doesn't redo them).
193struct SubtreeWalk {
194    newest_mtime: Option<DateTime<Utc>>,
195    message_files: Vec<PathBuf>,
196    /// One entry per message file, in the same order; each is the sorted list
197    /// of part files for that message. Empty vec = message has no parts.
198    part_files_by_message: Vec<Vec<PathBuf>>,
199}
200
201/// Walk `<root>/session/<projectID>/<sessionID>.json`, sorted for deterministic
202/// ingest order. A missing `session/` dir means "no sessions yet", not an error.
203fn collect_session_files(root: &Path) -> Result<Vec<SessionFile>, AdapterError> {
204    let session_root = root.join("session");
205    let io = |path: &Path, source| AdapterError::io(NAME, path.display().to_string(), source);
206    let entries = match std::fs::read_dir(&session_root) {
207        Ok(entries) => entries,
208        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
209        Err(error) => return Err(io(&session_root, error)),
210    };
211    let mut out = Vec::new();
212    for project in entries {
213        let project = project.map_err(|error| io(&session_root, error))?;
214        if !project
215            .file_type()
216            .map_err(|error| io(&project.path(), error))?
217            .is_dir()
218        {
219            continue;
220        }
221        let project_dir = project.path();
222        for session in std::fs::read_dir(&project_dir).map_err(|error| io(&project_dir, error))? {
223            let session = session.map_err(|error| io(&project_dir, error))?;
224            let path = session.path();
225            if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
226                continue;
227            }
228            let Some(session_id) = path
229                .file_stem()
230                .and_then(|s| s.to_str())
231                .map(ToOwned::to_owned)
232            else {
233                continue;
234            };
235            validate_path_id(
236                NAME,
237                "session file name",
238                &session_id,
239                path.display().to_string(),
240            )?;
241            out.push(SessionFile {
242                session_id,
243                path,
244                cached_subtree: None,
245            });
246        }
247    }
248    out.sort_by(|a, b| a.path.cmp(&b.path));
249    Ok(out)
250}
251
252/// Walk one session's full subtree: the session file, every message file under
253/// `message/<sid>/`, every part file under `part/<mid>/`. Returns the newest
254/// mtime seen plus the cached listings so the read pass can reuse them.
255fn walk_session_subtree(
256    root: &Path,
257    session_path: &Path,
258    session_id: &str,
259) -> Result<SubtreeWalk, AdapterError> {
260    let mut newest = json_mtime(session_path);
261    let message_dir = root.join("message").join(session_id);
262    let message_files = list_json_sorted(&message_dir)?;
263    let mut part_files_by_message = Vec::with_capacity(message_files.len());
264    for message_path in &message_files {
265        newest = newest.max(json_mtime(message_path));
266        let Some(message_id) = message_path.file_stem().and_then(|stem| stem.to_str()) else {
267            part_files_by_message.push(Vec::new());
268            continue;
269        };
270        validate_path_id(
271            NAME,
272            "message file name",
273            message_id,
274            message_path.display().to_string(),
275        )?;
276        let part_dir = root.join("part").join(message_id);
277        let parts = list_json_sorted(&part_dir)?;
278        for part_path in &parts {
279            newest = newest.max(json_mtime(part_path));
280        }
281        part_files_by_message.push(parts);
282    }
283    Ok(SubtreeWalk {
284        newest_mtime: newest,
285        message_files,
286        part_files_by_message,
287    })
288}
289
290fn json_mtime(path: &Path) -> Option<DateTime<Utc>> {
291    std::fs::metadata(path)
292        .and_then(|meta| meta.modified())
293        .ok()
294        .map(DateTime::<Utc>::from)
295}
296
297fn read_sessions(
298    adapter: &OpencodeAdapter,
299    sessions: Vec<SessionFile>,
300    tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
301) {
302    for session in sessions {
303        if !read_one_session(adapter, session, tx) {
304            return;
305        }
306    }
307}
308
309/// Returns `false` when the consumer dropped the receiver and the read should stop.
310fn read_one_session(
311    adapter: &OpencodeAdapter,
312    file: SessionFile,
313    tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
314) -> bool {
315    macro_rules! emit {
316        ($item:expr) => {
317            if tx.blocking_send($item).is_err() {
318                return false;
319            }
320        };
321    }
322
323    let session_value = match read_json(&file.path) {
324        Ok(value) => value,
325        Err(error) => {
326            emit!(Err(error));
327            return true;
328        }
329    };
330    let session = match session_from_value(&session_value, &file.path) {
331        Ok(session) => session,
332        Err(error) => {
333            emit!(Err(error));
334            return true;
335        }
336    };
337    let session_id = session.id.clone();
338    if let Err(error) = validate_path_id(
339        NAME,
340        "session id",
341        &session_id,
342        file.path.display().to_string(),
343    ) {
344        emit!(Err(error));
345        return true;
346    }
347    let session_created_at = session.created_at;
348    emit!(Ok(AdapterYield::Event(IngestEvent::Session(session))));
349
350    // Reuse the freshness pre-walk's listings when present; otherwise list now.
351    let (message_files, mut part_files_by_message) = match file.cached_subtree {
352        Some(walk) => (walk.message_files, walk.part_files_by_message),
353        None => {
354            let message_dir = adapter.root.join("message").join(&session_id);
355            let files = match list_json_sorted(&message_dir) {
356                Ok(files) => files,
357                Err(error) => {
358                    emit!(Err(error));
359                    return true;
360                }
361            };
362            (files, Vec::new())
363        }
364    };
365    let use_cache = !part_files_by_message.is_empty();
366
367    for (index, message_path) in message_files.iter().enumerate() {
368        let message_value = match read_json(message_path) {
369            Ok(value) => value,
370            Err(error) => {
371                emit!(Err(error));
372                continue;
373            }
374        };
375        let Some(message_id) = message_value.get("id").and_then(Value::as_str) else {
376            emit!(Err(AdapterError::schema(
377                NAME,
378                message_path.display().to_string(),
379                "message file missing `id`",
380            )));
381            continue;
382        };
383        if let Err(error) = validate_path_id(
384            NAME,
385            "message id",
386            message_id,
387            message_path.display().to_string(),
388        ) {
389            emit!(Err(error));
390            continue;
391        }
392        let part_files = if use_cache {
393            std::mem::take(&mut part_files_by_message[index])
394        } else {
395            let part_dir = adapter.root.join("part").join(message_id);
396            match list_json_sorted(&part_dir) {
397                Ok(files) => files,
398                Err(error) => {
399                    emit!(Err(error));
400                    continue;
401                }
402            }
403        };
404        let mut parts = Vec::with_capacity(part_files.len());
405        for part_path in part_files {
406            match read_json(&part_path) {
407                Ok(value) => parts.push(value),
408                Err(error) => emit!(Err(error)),
409            }
410        }
411        match build_message_events(&session_id, &message_value, &parts, session_created_at) {
412            Ok(events) => {
413                for event in events {
414                    emit!(Ok(AdapterYield::Event(event)));
415                }
416            }
417            Err(error) => emit!(Err(error)),
418        }
419    }
420    true
421}
422
423/// Read one JSON file, bounding every string leaf at the seam cap
424/// (spec.md#adapter-bounded-values) before it leaves this module. One open +
425/// one metadata syscall on the handle - avoids the duplicate `std::fs::metadata`
426/// + `std::fs::read` pair on the ~1k-file real corpus.
427fn read_json(path: &Path) -> Result<Value, AdapterError> {
428    use std::io::Read;
429    let io = |source| AdapterError::io(NAME, path.display().to_string(), source);
430    let mut file = std::fs::File::open(path).map_err(io)?;
431    let len = file.metadata().map_err(io)?.len();
432    if len > RECORD_CAP as u64 {
433        return Err(AdapterError::schema(
434            NAME,
435            path.display().to_string(),
436            format!("json file exceeds adapter record cap: {len} bytes > {RECORD_CAP}"),
437        ));
438    }
439    let mut bytes = Vec::with_capacity(len as usize);
440    file.read_to_end(&mut bytes).map_err(io)?;
441    let mut value: Value = serde_json::from_slice(&bytes)
442        .map_err(|error| AdapterError::parse(NAME, path.display().to_string(), 1, error))?;
443    bound_value(&mut value);
444    Ok(value)
445}
446
447/// List `*.json` files in `dir`, sorted by filename (= creation order, ids are
448/// time-sortable). A missing dir is an empty list - a message can legitimately
449/// carry no parts.
450fn list_json_sorted(dir: &Path) -> Result<Vec<PathBuf>, AdapterError> {
451    let entries = match std::fs::read_dir(dir) {
452        Ok(entries) => entries,
453        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
454        Err(error) => return Err(AdapterError::io(NAME, dir.display().to_string(), error)),
455    };
456    let mut out = Vec::new();
457    for entry in entries {
458        let entry =
459            entry.map_err(|error| AdapterError::io(NAME, dir.display().to_string(), error))?;
460        let path = entry.path();
461        if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
462            out.push(path);
463        }
464    }
465    out.sort();
466    Ok(out)
467}
468
469fn session_from_value(value: &Value, path: &Path) -> Result<Session, AdapterError> {
470    let display = path.display().to_string();
471    let id = value
472        .get("id")
473        .and_then(Value::as_str)
474        .ok_or_else(|| AdapterError::schema(NAME, display.clone(), "session missing `id`"))?
475        .to_owned();
476    let created_at = millis_at(value, &["time", "created"]).ok_or_else(|| {
477        AdapterError::schema(NAME, display.clone(), "session missing `time.created`")
478    })?;
479    // spec.md#model-project-non-empty: opencode always records `directory` (the
480    // project cwd); its absence is a malformed session, not a default.
481    let project = extract_str(value, "directory")
482        .ok_or_else(|| AdapterError::schema(NAME, display, "session missing `directory`"))?;
483
484    let options = opencode_raw(value);
485
486    Ok(Session {
487        id,
488        // opencode sub-sessions (a Task spawn) carry `parentID`; a soft
489        // reference, present only when this session was spawned from another.
490        parent_session_id: value
491            .get("parentID")
492            .and_then(Value::as_str)
493            .map(ToOwned::to_owned),
494        parent_message_id: None,
495        source_agent: NAME.to_owned(),
496        created_at,
497        project,
498        options,
499    })
500}
501
502/// Build the ordered event stream for one message: the message, its parts in
503/// order, then any synthetic `Tool` messages (one per `tool` part) each
504/// followed by its `ToolResult`.
505fn build_message_events(
506    session_id: &str,
507    message_value: &Value,
508    part_values: &[Value],
509    default_timestamp: DateTime<Utc>,
510) -> Result<Vec<IngestEvent>, AdapterError> {
511    let message_id = message_value
512        .get("id")
513        .and_then(Value::as_str)
514        .ok_or_else(|| AdapterError::schema(NAME, session_id.to_owned(), "message missing `id`"))?;
515    let role = message_value.get("role").and_then(Value::as_str);
516    let timestamp = millis_at(message_value, &["time", "created"]).unwrap_or(default_timestamp);
517
518    let options = opencode_raw(message_value);
519    let message = match role {
520        Some("user") => Message::User {
521            id: message_id.to_owned(),
522            session_id: session_id.to_owned(),
523            timestamp,
524            options,
525        },
526        Some("assistant") => Message::Assistant {
527            id: message_id.to_owned(),
528            session_id: session_id.to_owned(),
529            timestamp,
530            options,
531        },
532        // opencode v2 carries non-conversational message roles (synthetic,
533        // shell, compaction); keep them as System carriers rather than drop
534        // them (spec.md#adapter-integrity-no-silent-drops). The raw record
535        // survives in options; the role label is the content.
536        _ => Message::System {
537            id: message_id.to_owned(),
538            session_id: session_id.to_owned(),
539            timestamp,
540            content: extract_str(message_value, "role"),
541            options,
542        },
543    };
544
545    let mut events = vec![IngestEvent::Message(message)];
546    let mut deferred = Vec::new();
547    for (ordinal, part_value) in part_values.iter().enumerate() {
548        let mapped = map_part(session_id, message_id, ordinal, part_value, timestamp)?;
549        events.push(IngestEvent::Part(mapped.part));
550        if let Some(split) = mapped.tool_split {
551            deferred.push(split);
552        }
553    }
554    for ToolSplit {
555        message: tool_message,
556        result,
557    } in deferred
558    {
559        events.push(IngestEvent::Message(tool_message));
560        events.push(IngestEvent::Part(result));
561    }
562    Ok(events)
563}
564
565/// One mapped source part: the canonical Part it becomes, plus - for a fused
566/// `tool` part - the synthetic `Tool` message and `ToolResult` it splits off.
567struct MappedPart {
568    part: Part,
569    tool_split: Option<ToolSplit>,
570}
571
572struct ToolSplit {
573    message: Message,
574    result: Part,
575}
576
577fn map_part(
578    session_id: &str,
579    message_id: &str,
580    ordinal: usize,
581    value: &Value,
582    message_ts: DateTime<Utc>,
583) -> Result<MappedPart, AdapterError> {
584    let kind = value.get("type").and_then(Value::as_str);
585    let id = value
586        .get("id")
587        .and_then(Value::as_str)
588        .ok_or_else(|| AdapterError::schema(NAME, message_id.to_owned(), "part missing `id`"))?
589        .to_owned();
590
591    if kind == Some("tool") {
592        return Ok(tool_part(
593            session_id, message_id, &id, ordinal, value, message_ts,
594        ));
595    }
596
597    let (provenance, part_kind) = match kind {
598        Some("text") => (text_provenance(value), text_kind(value)),
599        Some("reasoning") => (Provenance::Conversational, reasoning_kind(value)),
600        Some("file") => (Provenance::Conversational, file_kind(value)),
601        // patch / step-start / step-finish (and any other marker) are
602        // harness-produced turn machinery, not conversation: keep them as
603        // injected Parts whose `raw_record` round-trips the source file.
604        _ => (Provenance::Injected, PartKind::Text { text: None }),
605    };
606
607    Ok(MappedPart {
608        part: Part {
609            session_id: session_id.to_owned(),
610            id,
611            message_id: message_id.to_owned(),
612            ordinal: part_ordinal(ordinal),
613            provenance,
614            options: opencode_raw(value),
615            kind: part_kind,
616        },
617        tool_split: None,
618    })
619}
620
621/// spec.md#model-part-provenance: opencode marks harness-injected text parts
622/// (the `Called the <tool> tool ...` echo, auto-expanded `@file` content) with
623/// `synthetic: true`; a genuine prompt or model reply is `synthetic: false`.
624fn text_provenance(value: &Value) -> Provenance {
625    if value.get("synthetic").and_then(Value::as_bool) == Some(true) {
626        Provenance::Injected
627    } else {
628        Provenance::Conversational
629    }
630}
631
632fn text_kind(value: &Value) -> PartKind {
633    PartKind::Text {
634        text: extract_str(value, "text"),
635    }
636}
637
638fn reasoning_kind(value: &Value) -> PartKind {
639    PartKind::Reasoning {
640        text: extract_str(value, "text"),
641    }
642}
643
644fn file_kind(value: &Value) -> PartKind {
645    // spec.md#model-no-synthesis: an absent mime hint is faithfully `None`,
646    // not a synthesized `application/octet-stream` placeholder.
647    let media_type = value
648        .get("mime")
649        .and_then(Value::as_str)
650        .map(ToOwned::to_owned);
651    let file_name = value
652        .get("filename")
653        .and_then(Value::as_str)
654        .map(ToOwned::to_owned);
655    let data = match value.get("url").and_then(Value::as_str) {
656        Some(url) => FileData::Url(url.to_owned()),
657        None => FileData::String(compact_json(value)),
658    };
659    PartKind::File {
660        media_type,
661        file_name,
662        data,
663    }
664}
665
666/// Split one opencode `tool` part (call + result fused) into a `ToolCall` on the
667/// owning assistant message and a synthetic `Tool` message carrying the
668/// `ToolResult`. The `ToolCall` keeps the source part's id and stores its full
669/// `raw_record`, so native restore reproduces the single source file; the
670/// synthetic records carry no `raw_record` and are skipped on restore.
671fn tool_part(
672    session_id: &str,
673    message_id: &str,
674    id: &str,
675    ordinal: usize,
676    value: &Value,
677    message_ts: DateTime<Utc>,
678) -> MappedPart {
679    let call_id = extract_str(value, "callID");
680    let name = extract_str(value, "tool");
681    let state = value.get("state");
682    let status = state.and_then(|s| s.get("status")).and_then(Value::as_str);
683    let result_ts = millis_at(value, &["state", "time", "end"]).unwrap_or(message_ts);
684
685    // Take input/output by moving them out of a single owned `state` clone
686    // rather than cloning each field separately - on the real corpus a fused
687    // tool part fans into three records (call + tool message + result), each
688    // of which used to clone its own slice of `state`.
689    let mut owned_state = state.cloned().unwrap_or(Value::Null);
690    let (input, result) = match owned_state.as_object_mut() {
691        Some(map) => {
692            let input = map.remove("input").unwrap_or(Value::Null);
693            let result = map
694                .remove("output")
695                .or_else(|| map.remove("error"))
696                .unwrap_or_else(|| {
697                    // No output/error - the rest of `state` IS the payload.
698                    std::mem::take(&mut owned_state)
699                });
700            (input, result)
701        }
702        None => (Value::Null, Value::Null),
703    };
704
705    let tool_call = Part {
706        session_id: session_id.to_owned(),
707        id: id.to_owned(),
708        message_id: message_id.to_owned(),
709        ordinal: part_ordinal(ordinal),
710        // spec.md#model-part-provenance: the model authored the tool call.
711        provenance: Provenance::Conversational,
712        options: opencode_raw(value),
713        kind: PartKind::ToolCall {
714            call_id: call_id.clone(),
715            name: name.clone(),
716            params: input,
717            provider_executed: false,
718        },
719    };
720
721    let tool_message_id = format!("{id}/result");
722    let tool_message = Message::Tool {
723        id: tool_message_id.clone(),
724        session_id: session_id.to_owned(),
725        timestamp: result_ts,
726        options: synthetic_options(),
727    };
728    let result_part = Part {
729        session_id: session_id.to_owned(),
730        id: part_id(&tool_message_id, 0),
731        message_id: tool_message_id,
732        ordinal: 0,
733        // spec.md#model-part-provenance: tool output is runtime-produced.
734        provenance: Provenance::Injected,
735        options: synthetic_options(),
736        kind: PartKind::ToolResult {
737            call_id,
738            name,
739            is_failure: status == Some("error"),
740            result,
741        },
742    };
743
744    MappedPart {
745        part: tool_call,
746        tool_split: Some(ToolSplit {
747            message: tool_message,
748            result: result_part,
749        }),
750    }
751}
752
753#[inline]
754fn opencode_raw(value: &Value) -> ProviderOptions {
755    source_options(NAME, value)
756}
757
758/// Marks a canonical record the adapter synthesized (the `Tool` message and
759/// `ToolResult` split off a fused `tool` part). Native restore skips records so
760/// marked - they correspond to no source file.
761fn synthetic_options() -> ProviderOptions {
762    let mut options = ProviderOptions::new();
763    options.insert("opencode".to_owned(), json!({ "synthetic": true }));
764    options
765}
766
767fn millis_at(value: &Value, path: &[&str]) -> Option<DateTime<Utc>> {
768    let mut cursor = value;
769    for key in path {
770        cursor = cursor.get(key)?;
771    }
772    DateTime::from_timestamp_millis(cursor.as_i64()?)
773}
774
775fn is_synthetic(options: &ProviderOptions) -> bool {
776    options
777        .get("opencode")
778        .and_then(|o| o.get("synthetic"))
779        .and_then(Value::as_bool)
780        == Some(true)
781}
782
783fn serialize_native(
784    session: &crate::sessions::SessionWithMessages,
785) -> Result<Vec<RestoredFile>, AdapterError> {
786    // Native replays each stored `raw_record` at its original split path; the
787    // synthetic `Tool` message and `ToolResult` (which carry no `raw_record`)
788    // are skipped, re-fusing into the single source `tool` part. Replay echoes
789    // a frozen snapshot - safe only while canonical is append-only
790    // (spec.md#adapter-integrity-additive-sync).
791    //
792    // spec.md#adapter-native-restore-lossless: when the session lacks a stored
793    // `raw_record` (older ingest, foreign-sourced session), native is
794    // impossible. We downgrade to foreign and stamp `actual_fidelity` so the
795    // caller can surface the downgrade instead of getting a silent surprise.
796    let Some(session_raw) = raw_record(&session.session.options) else {
797        return serialize_foreign(session);
798    };
799    let project_id = session_raw
800        .get("projectID")
801        .and_then(Value::as_str)
802        .ok_or_else(|| {
803            AdapterError::schema(
804                NAME,
805                session.session.id.clone(),
806                "stored session raw_record missing projectID",
807            )
808        })?;
809
810    let mut files = vec![RestoredFile::new(
811        PathBuf::from("session")
812            .join(project_id)
813            .join(format!("{}.json", session.session.id)),
814        encode(&session_raw, &session.session.id)?,
815        RestoreFidelity::Native,
816    )];
817
818    for message in &session.messages {
819        if !is_synthetic(message.message.options())
820            && let Some(raw) = raw_record(message.message.options())
821        {
822            files.push(RestoredFile::new(
823                PathBuf::from("message")
824                    .join(&session.session.id)
825                    .join(format!("{}.json", message.message.id())),
826                encode(&raw, message.message.id())?,
827                RestoreFidelity::Native,
828            ));
829        }
830        for part in &message.parts {
831            // A part that carries a `raw_record` maps 1:1 to a source file at
832            // `part/<message_id>/<part_id>.json`; synthetic split parts do not.
833            if let Some(raw) = raw_record(&part.options) {
834                files.push(RestoredFile::new(
835                    PathBuf::from("part")
836                        .join(&part.message_id)
837                        .join(format!("{}.json", part.id)),
838                    encode(&raw, &part.id)?,
839                    RestoreFidelity::Native,
840                ));
841            }
842        }
843    }
844    Ok(files)
845}
846
847fn serialize_foreign(
848    session: &crate::sessions::SessionWithMessages,
849) -> Result<Vec<RestoredFile>, AdapterError> {
850    // Foreign restore: a best-effort, idiomatic opencode tree. A non-opencode
851    // session has no `projectID` hash, so derive a stable directory key from
852    // the project path; tool results (canonical `Tool` messages) and System
853    // carriers have no idiomatic home in opencode's part model and are dropped
854    // (spec.md#adapter-native-restore-lossless, foreign clause).
855    let project_id = encode_project(&session.session.project);
856    let created = session.session.created_at.timestamp_millis();
857    let session_record = json!({
858        "id": session.session.id,
859        "projectID": project_id,
860        "directory": &*session.session.project,
861        "time": { "created": created, "updated": created },
862    });
863    let mut files = vec![RestoredFile::new(
864        PathBuf::from("session")
865            .join(&project_id)
866            .join(format!("{}.json", session.session.id)),
867        encode(&session_record, &session.session.id)?,
868        RestoreFidelity::Foreign,
869    )];
870
871    for message in &session.messages {
872        let role = match message.message {
873            Message::User { .. } => "user",
874            Message::Assistant { .. } => "assistant",
875            // No idiomatic opencode home; content stays in canonical.
876            Message::Tool { .. } | Message::System { .. } => continue,
877        };
878        let created = message.message.timestamp().timestamp_millis();
879        let record = json!({
880            "id": message.message.id(),
881            "sessionID": session.session.id,
882            "role": role,
883            "time": { "created": created },
884        });
885        files.push(RestoredFile::new(
886            PathBuf::from("message")
887                .join(&session.session.id)
888                .join(format!("{}.json", message.message.id())),
889            encode(&record, message.message.id())?,
890            RestoreFidelity::Foreign,
891        ));
892        for part in &message.parts {
893            let Some(record) = foreign_part(&session.session.id, part) else {
894                continue;
895            };
896            files.push(RestoredFile::new(
897                PathBuf::from("part")
898                    .join(message.message.id())
899                    .join(format!("{}.json", part.id)),
900                encode(&record, &part.id)?,
901                RestoreFidelity::Foreign,
902            ));
903        }
904    }
905    Ok(files)
906}
907
908fn foreign_part(session_id: &str, part: &Part) -> Option<Value> {
909    let mut record = match &part.kind {
910        PartKind::Text { text } => json!({
911            "type": "text",
912            "text": text.as_deref().map(|t| &**t),
913            "synthetic": part.provenance == Provenance::Injected,
914        }),
915        PartKind::Reasoning { text } => json!({
916            "type": "reasoning",
917            "text": text.as_deref().map(|t| &**t),
918        }),
919        PartKind::File {
920            media_type,
921            file_name,
922            data,
923        } => json!({
924            "type": "file",
925            "mime": media_type,
926            "filename": file_name,
927            "url": match data {
928                FileData::Url(url) => Some(url.clone()),
929                _ => None,
930            },
931        }),
932        PartKind::ToolCall {
933            call_id,
934            name,
935            params,
936            ..
937        } => json!({
938            "type": "tool",
939            "callID": call_id.as_deref().map(|c| &**c),
940            "tool": name.as_deref().map(|n| &**n),
941            "state": { "status": "completed", "input": params },
942        }),
943        // ToolResult / approval parts have no standalone opencode shape.
944        _ => return None,
945    };
946    if let Value::Object(map) = &mut record {
947        map.insert("id".to_owned(), json!(part.id));
948        map.insert("sessionID".to_owned(), json!(session_id));
949        map.insert("messageID".to_owned(), json!(part.message_id));
950    }
951    Some(record)
952}
953
954fn encode(value: &Value, location: &str) -> Result<Vec<u8>, AdapterError> {
955    serde_json::to_vec(value).map_err(|error| {
956        AdapterError::schema(
957            NAME,
958            location.to_owned(),
959            format!("json encode failed: {error}"),
960        )
961    })
962}
963
964fn encode_project(project: &str) -> String {
965    project
966        .chars()
967        .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
968        .collect()
969}
970
971#[cfg(test)]
972mod tests {
973    //! End-to-end test for the opencode adapter: ingest the committed
974    //! split-file fixture corpus and assert pond's canonical shape comes out
975    //! the other side, including the fused-tool-part split. The fixture lives
976    //! under `tests/fixtures/adapter/opencode/storage/`.
977    #![allow(clippy::expect_used, clippy::unwrap_used)]
978
979    use super::*;
980    use crate::{
981        adapter::extract::LEAF_CAP, handlers::ingest_adapter, sessions::Store, wire::PartKind,
982    };
983    use tempfile::TempDir;
984
985    const FIXTURES: &str = "tests/fixtures/adapter/opencode/storage";
986    const FRESH_SESSION_ID: &str = "ses_6405e5a5cffeIG2QHRuTmm4mA7";
987    const FRESH_MESSAGE_ID: &str = "msg_zzzzfresh0001";
988    const FRESH_PART_ID: &str = "prt_zzzzfresh0001";
989
990    struct FixedOracle {
991        session_id: &'static str,
992        ingested_at: DateTime<Utc>,
993    }
994
995    impl crate::adapter::SkipOracle for FixedOracle {
996        fn last_ingested_at(&self, session_id: &str) -> Option<DateTime<Utc>> {
997            (session_id == self.session_id).then_some(self.ingested_at)
998        }
999    }
1000
1001    #[test]
1002    fn probe_default_finds_opencode_storage_under_home() -> anyhow::Result<()> {
1003        crate::adapter::test_support::assert_probe_default(
1004            &OpencodeFactory,
1005            &[".local", "share", "opencode", "storage"],
1006        )
1007    }
1008
1009    #[tokio::test(flavor = "multi_thread")]
1010    async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
1011        let adapter = OpencodeAdapter::new(FIXTURES);
1012        crate::adapter::test_support::assert_native_restore(
1013            &OpencodeFactory,
1014            &adapter,
1015            // opencode relative paths are rooted at the `storage/` dir.
1016            std::path::Path::new(FIXTURES),
1017        )
1018        .await
1019    }
1020
1021    #[tokio::test(flavor = "multi_thread")]
1022    async fn freshness_uses_message_and_part_file_mtimes() -> anyhow::Result<()> {
1023        let temp = TempDir::new()?;
1024        let source = temp.path().join("storage");
1025        copy_dir(std::path::Path::new(FIXTURES), &source)?;
1026
1027        let store_dir = temp.path().join("store");
1028        let store = Store::open_local(&store_dir).await?;
1029        let adapter = OpencodeAdapter::new(&source);
1030        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1031
1032        let watermark = Utc::now();
1033        std::thread::sleep(std::time::Duration::from_millis(1100));
1034        append_fresh_opencode_turn(&source)?;
1035
1036        let oracle = FixedOracle {
1037            session_id: FRESH_SESSION_ID,
1038            ingested_at: watermark,
1039        };
1040        ingest_adapter(&store, &adapter, &oracle, |_| {}).await?;
1041
1042        let session = store
1043            .get_session(FRESH_SESSION_ID)
1044            .await?
1045            .expect("fixture session round-trips");
1046        let fresh = session
1047            .messages
1048            .iter()
1049            .find(|stored| stored.message.id() == FRESH_MESSAGE_ID)
1050            .expect("message added after the session file mtime must land");
1051        assert!(
1052            fresh.parts.iter().any(|part| matches!(
1053                &part.kind,
1054                PartKind::Text { text } if text.as_deref().map(|value| value.as_str()) == Some("fresh opencode text")
1055            )),
1056            "fresh message part must land with the re-read session",
1057        );
1058        Ok(())
1059    }
1060
1061    #[tokio::test(flavor = "multi_thread")]
1062    async fn malformed_part_file_drops_only_that_part() -> anyhow::Result<()> {
1063        let temp = TempDir::new()?;
1064        let source = temp.path().join("storage");
1065        write_minimal_session(&source, "ses_badpart", "msg_badpart")?;
1066        let part_dir = source.join("part").join("msg_badpart");
1067        std::fs::write(part_dir.join("prt_000_bad.json"), b"{not json")?;
1068        write_json_file(
1069            &part_dir.join("prt_999_good.json"),
1070            &json!({
1071                "id": "prt_999_good",
1072                "sessionID": "ses_badpart",
1073                "messageID": "msg_badpart",
1074                "type": "text",
1075                "text": "valid sibling survives",
1076                "synthetic": false,
1077            }),
1078        )?;
1079
1080        let store = Store::open_local(temp.path().join("store")).await?;
1081        let summary = ingest_adapter(
1082            &store,
1083            &OpencodeAdapter::new(&source),
1084            &crate::adapter::NoopOracle,
1085            |_| {},
1086        )
1087        .await?;
1088
1089        assert_eq!(summary.dropped_events, 1);
1090        let session = store
1091            .get_session("ses_badpart")
1092            .await?
1093            .expect("session with one malformed part still lands");
1094        let message = session
1095            .messages
1096            .iter()
1097            .find(|stored| stored.message.id() == "msg_badpart")
1098            .expect("message with valid sibling part still lands");
1099        assert!(message.parts.iter().any(|part| {
1100            matches!(
1101                &part.kind,
1102                PartKind::Text { text }
1103                    if text.as_deref().map(String::as_str) == Some("valid sibling survives")
1104            )
1105        }));
1106        Ok(())
1107    }
1108
1109    #[test]
1110    fn missing_message_timestamp_uses_session_anchor() -> anyhow::Result<()> {
1111        let session_anchor =
1112            DateTime::parse_from_rfc3339("2026-05-05T12:13:14Z")?.with_timezone(&Utc);
1113        let events = build_message_events(
1114            "ses_anchor",
1115            &json!({"id": "msg_no_time", "role": "user"}),
1116            &[],
1117            session_anchor,
1118        )?;
1119
1120        let IngestEvent::Message(message) = &events[0] else {
1121            panic!("first event is the message");
1122        };
1123        assert_eq!(message.timestamp(), session_anchor);
1124        Ok(())
1125    }
1126
1127    #[test]
1128    fn source_part_without_id_is_schema_error() {
1129        let session_anchor = DateTime::from_timestamp_millis(1_765_000_000_000).unwrap();
1130        let error = build_message_events(
1131            "ses_missing_part_id",
1132            &json!({
1133                "id": "msg_missing_part_id",
1134                "role": "assistant",
1135                "time": { "created": 1_765_000_000_000i64 },
1136            }),
1137            &[json!({"type": "text", "text": "cannot restore its filename"})],
1138            session_anchor,
1139        )
1140        .expect_err("part ids are required for native filename replay");
1141
1142        assert!(error.to_string().contains("part missing `id`"));
1143    }
1144
1145    #[test]
1146    fn read_json_bounds_oversized_string_leaves() -> anyhow::Result<()> {
1147        let temp = TempDir::new()?;
1148        let path = temp.path().join("oversized.json");
1149        write_json_file(
1150            &path,
1151            &json!({
1152                "id": "oversized",
1153                "text": "x".repeat(LEAF_CAP + 100),
1154            }),
1155        )?;
1156
1157        let value = read_json(&path)?;
1158        let text = value
1159            .get("text")
1160            .and_then(Value::as_str)
1161            .expect("text leaf survives as a bounded marker");
1162        assert!(text.len() <= LEAF_CAP);
1163        assert!(text.ends_with(&format!("{} bytes>", LEAF_CAP + 100)));
1164        Ok(())
1165    }
1166
1167    #[tokio::test(flavor = "multi_thread")]
1168    async fn opencode_adapter_ingests_fixture_corpus_into_canonical_shape() -> anyhow::Result<()> {
1169        let temp = TempDir::new()?;
1170        let store = Store::open_local(temp.path()).await?;
1171        let adapter = OpencodeAdapter::new(FIXTURES);
1172
1173        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1174        assert!(summary.accepted() > 0, "ingest must accept rows");
1175        assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
1176        assert_eq!(
1177            summary.dropped_sessions, 0,
1178            "no session-level rejections expected"
1179        );
1180
1181        let (sessions, messages, parts) = store.row_counts().await?;
1182        assert!(sessions > 0, "at least one opencode session");
1183        assert!(messages > 0, "at least one opencode message");
1184        assert!(parts > 0, "at least one opencode Part");
1185
1186        let mut saw_call = false;
1187        let mut saw_result = false;
1188        let mut saw_injected_text = false;
1189        for session_id in store.session_ids().await? {
1190            let session = store
1191                .get_session(&session_id)
1192                .await?
1193                .expect("session round-trips");
1194            assert_eq!(session.session.source_agent, NAME);
1195            assert!(
1196                !(*session.session.project).is_empty(),
1197                "spec.md#model-project-non-empty",
1198            );
1199            for stored in &session.messages {
1200                for part in &stored.parts {
1201                    match &part.kind {
1202                        PartKind::ToolCall { .. } => saw_call = true,
1203                        PartKind::ToolResult { .. } => saw_result = true,
1204                        PartKind::Text { .. } if part.provenance == Provenance::Injected => {
1205                            saw_injected_text = true;
1206                        }
1207                        _ => {}
1208                    }
1209                }
1210            }
1211        }
1212        assert!(saw_call, "fused tool parts yield ToolCall on the assistant");
1213        assert!(
1214            saw_result,
1215            "fused tool parts split off a ToolResult on a Tool message",
1216        );
1217        assert!(
1218            saw_injected_text,
1219            "spec.md#model-part-provenance: synthetic text parts are injected",
1220        );
1221        Ok(())
1222    }
1223
1224    /// The synthetic `Tool` message a `tool` part splits off must carry a
1225    /// `Tool` role with one `ToolResult`, and must NOT collide with or
1226    /// overwrite the assistant message that owns the `ToolCall`.
1227    #[tokio::test(flavor = "multi_thread")]
1228    async fn fused_tool_part_splits_into_call_and_result() -> anyhow::Result<()> {
1229        let temp = TempDir::new()?;
1230        let store = Store::open_local(temp.path()).await?;
1231        let adapter = OpencodeAdapter::new(FIXTURES);
1232        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1233
1234        let mut call_ids = std::collections::HashSet::new();
1235        let mut result_ids = std::collections::HashSet::new();
1236        let mut saw_failure = false;
1237        for session_id in store.session_ids().await? {
1238            let session = store
1239                .get_session(&session_id)
1240                .await?
1241                .expect("session round-trips");
1242            for stored in &session.messages {
1243                for part in &stored.parts {
1244                    match &part.kind {
1245                        PartKind::ToolCall { call_id, .. } => {
1246                            if let Some(id) = call_id.as_deref() {
1247                                call_ids.insert(id.clone());
1248                            }
1249                        }
1250                        PartKind::ToolResult {
1251                            call_id,
1252                            is_failure,
1253                            result,
1254                            ..
1255                        } => {
1256                            assert!(
1257                                matches!(stored.message, Message::Tool { .. }),
1258                                "a ToolResult must live on a Tool-role message",
1259                            );
1260                            if *is_failure {
1261                                saw_failure = true;
1262                                assert_ne!(
1263                                    result,
1264                                    &Value::Null,
1265                                    "failed tool results must carry the source error/output payload",
1266                                );
1267                            }
1268                            if let Some(id) = call_id.as_deref() {
1269                                result_ids.insert(id.clone());
1270                            }
1271                        }
1272                        _ => {}
1273                    }
1274                }
1275            }
1276        }
1277        assert!(!call_ids.is_empty(), "corpus has tool calls");
1278        assert_eq!(
1279            call_ids, result_ids,
1280            "every tool call's id is matched by its split-off result",
1281        );
1282        assert!(
1283            saw_failure,
1284            "fixture has at least one failed opencode tool result"
1285        );
1286        Ok(())
1287    }
1288
1289    #[tokio::test(flavor = "multi_thread")]
1290    async fn foreign_serialization_reparses_as_opencode() -> anyhow::Result<()> {
1291        let temp = TempDir::new()?;
1292        let origin_store = Store::open_local(temp.path().join("origin-store")).await?;
1293        let origin = crate::adapter::PiCodingAgentAdapter::new(
1294            "tests/fixtures/adapter/pi-coding-agent/sessions",
1295        );
1296        ingest_adapter(&origin_store, &origin, &crate::adapter::NoopOracle, |_| {}).await?;
1297        let session_id = origin_store
1298            .session_ids()
1299            .await?
1300            .into_iter()
1301            .next()
1302            .expect("pi fixture has sessions");
1303        let session = origin_store
1304            .get_session(&session_id)
1305            .await?
1306            .expect("fixture session is readable");
1307
1308        let restored_root = temp.path().join("opencode-storage");
1309        crate::adapter::write_restored_files(
1310            &restored_root,
1311            OpencodeFactory.serialize(&session, RestoreFidelity::Foreign)?,
1312        )?;
1313        let restored_store = Store::open_local(temp.path().join("restored-store")).await?;
1314        let summary = ingest_adapter(
1315            &restored_store,
1316            &OpencodeAdapter::new(&restored_root),
1317            &crate::adapter::NoopOracle,
1318            |_| {},
1319        )
1320        .await?;
1321
1322        assert!(summary.accepted() > 0);
1323        assert_eq!(summary.dropped_events, 0);
1324        Ok(())
1325    }
1326
1327    #[test]
1328    fn path_ids_reject_separators_and_traversal() {
1329        let where_ = "session/project/session.json";
1330        assert!(validate_path_id(NAME, "session id", "ses_safe", where_).is_ok());
1331        assert!(validate_path_id(NAME, "session id", "../ses", where_).is_err());
1332        assert!(validate_path_id(NAME, "session id", "/tmp/ses", where_).is_err());
1333        assert!(validate_path_id(NAME, "message id", "msg/a", where_).is_err());
1334        assert!(validate_path_id(NAME, "message id", "msg\\a", where_).is_err());
1335    }
1336
1337    fn append_fresh_opencode_turn(root: &std::path::Path) -> anyhow::Result<()> {
1338        let message_dir = root.join("message").join(FRESH_SESSION_ID);
1339        let part_dir = root.join("part").join(FRESH_MESSAGE_ID);
1340        std::fs::create_dir_all(&message_dir)?;
1341        std::fs::create_dir_all(&part_dir)?;
1342        std::fs::write(
1343            message_dir.join(format!("{FRESH_MESSAGE_ID}.json")),
1344            serde_json::to_vec(&json!({
1345                "id": FRESH_MESSAGE_ID,
1346                "sessionID": FRESH_SESSION_ID,
1347                "role": "user",
1348                "time": { "created": 1759859999000i64 }
1349            }))?,
1350        )?;
1351        std::fs::write(
1352            part_dir.join(format!("{FRESH_PART_ID}.json")),
1353            serde_json::to_vec(&json!({
1354                "id": FRESH_PART_ID,
1355                "sessionID": FRESH_SESSION_ID,
1356                "messageID": FRESH_MESSAGE_ID,
1357                "type": "text",
1358                "text": "fresh opencode text",
1359                "synthetic": false
1360            }))?,
1361        )?;
1362        Ok(())
1363    }
1364
1365    fn write_minimal_session(
1366        root: &std::path::Path,
1367        session_id: &str,
1368        message_id: &str,
1369    ) -> anyhow::Result<()> {
1370        write_json_file(
1371            &root
1372                .join("session")
1373                .join("project")
1374                .join(format!("{session_id}.json")),
1375            &json!({
1376                "id": session_id,
1377                "projectID": "project",
1378                "directory": "/tmp/project",
1379                "time": { "created": 1_765_000_000_000i64, "updated": 1_765_000_000_000i64 },
1380            }),
1381        )?;
1382        write_json_file(
1383            &root
1384                .join("message")
1385                .join(session_id)
1386                .join(format!("{message_id}.json")),
1387            &json!({
1388                "id": message_id,
1389                "sessionID": session_id,
1390                "role": "assistant",
1391                "time": { "created": 1_765_000_000_001i64 },
1392            }),
1393        )?;
1394        std::fs::create_dir_all(root.join("part").join(message_id))?;
1395        Ok(())
1396    }
1397
1398    fn write_json_file(path: &std::path::Path, value: &Value) -> anyhow::Result<()> {
1399        if let Some(parent) = path.parent() {
1400            std::fs::create_dir_all(parent)?;
1401        }
1402        std::fs::write(path, serde_json::to_vec(value)?)?;
1403        Ok(())
1404    }
1405
1406    fn copy_dir(from: &std::path::Path, to: &std::path::Path) -> anyhow::Result<()> {
1407        std::fs::create_dir_all(to)?;
1408        for entry in std::fs::read_dir(from)? {
1409            let entry = entry?;
1410            let source = entry.path();
1411            let target = to.join(entry.file_name());
1412            if entry.file_type()?.is_dir() {
1413                copy_dir(&source, &target)?;
1414            } else {
1415                std::fs::copy(&source, &target)?;
1416            }
1417        }
1418        Ok(())
1419    }
1420}