Skip to main content

pond/adapter/
opencode.rs

1//! opencode adapter (github.com/sst/opencode).
2//!
3//! Unlike claude-code and codex-cli, opencode does not store one JSONL file per
4//! session. It uses a content-addressed split layout under a `storage/` root:
5//!
6//! - `session/<projectID>/<sessionID>.json` - session metadata
7//! - `message/<sessionID>/<messageID>.json` - one message header (no content)
8//! - `part/<messageID>/<partID>.json`        - one content part
9//!
10//! So this is the first adapter to drive the [`Adapter`] seam directly rather
11//! than through the JSONL helper: it walks the three levels, sorts by id (the
12//! ids are lexically time-sortable, so filename order is creation order), and
13//! emits `Session -> Message -> Parts` per session.
14//!
15//! opencode fuses a tool call and its result into one `tool` part on the
16//! assistant message. Canonical keeps the two apart (a `tool_result` on an
17//! assistant message is a category error, spec.md#model-part-provenance), so the
18//! adapter splits it: a `ToolCall` Part stays on the assistant message and a
19//! synthetic `Tool` message carries the `ToolResult`. Native restore replays
20//! each real part's stored `raw_record` at its original path and skips the
21//! synthetic records, so the split is value-complete-lossless.
22
23use std::path::{Path, PathBuf};
24
25use async_stream::stream;
26use chrono::{DateTime, Utc};
27use serde_json::{Value, json};
28use tokio::sync::mpsc;
29
30use crate::{
31    sessions::IngestEvent,
32    wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
33};
34
35use super::{
36    Adapter, AdapterError, AdapterFactory, AdapterYield, AdapterYieldStream, DiscoverFuture, Env,
37    RestoreFidelity, RestoredFile, SkipOracle, SkipReason, compact_json, config_path,
38    extract::{bound_value, extract_str},
39    jsonl::RECORD_CAP,
40    part_id, part_ordinal, raw_record, source_options, validate_path_id,
41};
42
43const NAME: &str = "opencode";
44
45/// Event-channel bound; doubles as backpressure - the blocking reader parks on
46/// `blocking_send` when the consumer lags.
47const CHANNEL_CAP: usize = 256;
48
49/// Stateless factory: opens [`OpencodeAdapter`] instances and probes for the
50/// canonical install location under `~/.local/share/opencode/storage`.
51pub struct OpencodeFactory;
52
53impl AdapterFactory for OpencodeFactory {
54    fn name(&self) -> &'static str {
55        NAME
56    }
57
58    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
59        Ok(Box::new(OpencodeAdapter::new(config_path(NAME, config)?)))
60    }
61
62    fn probe_default(&self, env: &Env) -> Option<Value> {
63        let path = env
64            .home
65            .join(".local")
66            .join("share")
67            .join("opencode")
68            .join("storage");
69        path.exists().then(|| json!({ "path": path }))
70    }
71
72    fn serialize(
73        &self,
74        session: &crate::sessions::SessionWithMessages,
75        fidelity: RestoreFidelity,
76    ) -> Result<Vec<RestoredFile>, AdapterError> {
77        match fidelity {
78            RestoreFidelity::Native => serialize_native(session),
79            RestoreFidelity::Foreign => serialize_foreign(session),
80        }
81    }
82}
83
84/// Configured opencode reader, rooted at a `storage/` directory.
85#[derive(Debug, Clone)]
86pub struct OpencodeAdapter {
87    root: PathBuf,
88}
89
90impl OpencodeAdapter {
91    pub fn new(root: impl Into<PathBuf>) -> Self {
92        Self { root: root.into() }
93    }
94}
95
96impl Adapter for OpencodeAdapter {
97    fn discover(&self) -> DiscoverFuture<'_> {
98        let root = self.root.clone();
99        Box::pin(async move {
100            tokio::task::spawn_blocking(move || {
101                collect_session_files(&root).map(|files| files.len())
102            })
103            .await
104            .map_err(join_error)?
105        })
106    }
107
108    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
109        let adapter = self.clone();
110        Box::pin(stream! {
111            let files = {
112                let root = adapter.root.clone();
113                tokio::task::spawn_blocking(move || collect_session_files(&root)).await
114            };
115            let files = match files {
116                Ok(Ok(files)) => files,
117                Ok(Err(error)) => { yield Err(error); return; }
118                Err(join) => { yield Err(join_error(join)); return; }
119            };
120
121            // Subtree walks happen ONLY when the oracle has entries - on a first
122            // ingest (or NoopOracle) every walk is wasted work since there's
123            // nothing to compare against. The walk also yields the session's
124            // newest stored timestamp (last message + its tool parts) for the
125            // freshness gate.
126            let mut survivors = Vec::with_capacity(files.len());
127            for mut file in files {
128                if !oracle.is_empty() {
129                    let walked = {
130                        let root = adapter.root.clone();
131                        let session_path = file.path.clone();
132                        let session_id = file.session_id.clone();
133                        tokio::task::spawn_blocking(move || {
134                            let walk = walk_session_subtree(&root, &session_path, &session_id)?;
135                            let last_ts = newest_message_ts(&walk);
136                            Ok::<_, AdapterError>((walk, last_ts))
137                        })
138                        .await
139                    };
140                    let (walk, last_ts) = match walked {
141                        Ok(Ok(pair)) => pair,
142                        Ok(Err(error)) => { yield Err(error); return; }
143                        Err(join) => { yield Err(join_error(join)); return; }
144                    };
145                    if crate::adapter::is_session_fresh(oracle, &file.session_id, last_ts) {
146                        yield Ok(AdapterYield::Skipped {
147                            session_id: Some(file.session_id.clone()),
148                            project: None,
149                            reason: SkipReason::Fresh,
150                        });
151                        continue;
152                    }
153                    // Cache the walk so the read pass doesn't re-list the
154                    // same dirs (we already paid for them above).
155                    file.cached_subtree = Some(walk);
156                }
157                survivors.push(file);
158            }
159
160            let (tx, mut rx) = mpsc::channel(CHANNEL_CAP);
161            let reader = adapter.clone();
162            let handle = tokio::task::spawn_blocking(move || read_sessions(&reader, survivors, &tx));
163            while let Some(item) = rx.recv().await {
164                yield item;
165            }
166            if let Err(join) = handle.await {
167                yield Err(join_error(join));
168            }
169        })
170    }
171}
172
173/// A blocking-task panic is a pond bug, not bad source data, so it fails the
174/// whole run rather than skipping a session.
175fn join_error(join: tokio::task::JoinError) -> AdapterError {
176    AdapterError::io(
177        NAME,
178        "blocking read task",
179        std::io::Error::other(join.to_string()),
180    )
181}
182
183/// One session file located on disk. `cached_subtree` is populated only when
184/// the freshness pre-walk happened (i.e. the oracle had a watermark for this
185/// session); the read pass reuses the listings instead of re-walking.
186struct SessionFile {
187    session_id: String,
188    path: PathBuf,
189    cached_subtree: Option<SubtreeWalk>,
190}
191
192/// Result of one subtree walk: the message and part directory listings (so the
193/// read pass doesn't redo them). The last `message_files` entry is the session's
194/// latest message id for the freshness check.
195struct SubtreeWalk {
196    message_files: Vec<PathBuf>,
197    /// One entry per message file, in the same order; each is the sorted list
198    /// of part files for that message. Empty vec = message has no parts.
199    part_files_by_message: Vec<Vec<PathBuf>>,
200}
201
202/// Walk `<root>/session/<projectID>/<sessionID>.json`, sorted for deterministic
203/// ingest order. A missing `session/` dir means "no sessions yet", not an error.
204fn collect_session_files(root: &Path) -> Result<Vec<SessionFile>, AdapterError> {
205    let session_root = root.join("session");
206    let io = |path: &Path, source| AdapterError::io(NAME, path.display().to_string(), source);
207    let entries = match std::fs::read_dir(&session_root) {
208        Ok(entries) => entries,
209        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
210        Err(error) => return Err(io(&session_root, error)),
211    };
212    let mut out = Vec::new();
213    for project in entries {
214        let project = project.map_err(|error| io(&session_root, error))?;
215        if !project
216            .file_type()
217            .map_err(|error| io(&project.path(), error))?
218            .is_dir()
219        {
220            continue;
221        }
222        let project_dir = project.path();
223        for session in std::fs::read_dir(&project_dir).map_err(|error| io(&project_dir, error))? {
224            let session = session.map_err(|error| io(&project_dir, error))?;
225            let path = session.path();
226            if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
227                continue;
228            }
229            let Some(session_id) = path
230                .file_stem()
231                .and_then(|s| s.to_str())
232                .map(ToOwned::to_owned)
233            else {
234                continue;
235            };
236            validate_path_id(
237                NAME,
238                "session file name",
239                &session_id,
240                path.display().to_string(),
241            )?;
242            out.push(SessionFile {
243                session_id,
244                path,
245                cached_subtree: None,
246            });
247        }
248    }
249    out.sort_by(|a, b| a.path.cmp(&b.path));
250    Ok(out)
251}
252
253/// Walk one session's full subtree: the message files under `message/<sid>/` and
254/// every part file under `part/<mid>/`, returning the listings so the read pass
255/// can reuse them. `session_path` is unused now that freshness keys on the latest
256/// message id rather than a subtree mtime.
257fn walk_session_subtree(
258    root: &Path,
259    _session_path: &Path,
260    session_id: &str,
261) -> Result<SubtreeWalk, AdapterError> {
262    let message_dir = root.join("message").join(session_id);
263    let message_files = list_json_sorted(&message_dir)?;
264    let mut part_files_by_message = Vec::with_capacity(message_files.len());
265    for message_path in &message_files {
266        let Some(message_id) = message_path.file_stem().and_then(|stem| stem.to_str()) else {
267            part_files_by_message.push(Vec::new());
268            continue;
269        };
270        validate_path_id(
271            NAME,
272            "message file name",
273            message_id,
274            message_path.display().to_string(),
275        )?;
276        let part_dir = root.join("part").join(message_id);
277        let parts = list_json_sorted(&part_dir)?;
278        part_files_by_message.push(parts);
279    }
280    Ok(SubtreeWalk {
281        message_files,
282        part_files_by_message,
283    })
284}
285
286/// Watermark for the freshness gate: the session's max stored message timestamp.
287/// That is the last message's `time.created` or any of its tool parts'
288/// `state.time.end` (a tool result completing after the message was created);
289/// earlier messages' events all precede the last message, so its subtree
290/// suffices. `None` on an empty session or unreadable last message -> safe
291/// re-read. Reads only the last message and its parts (a handful of small files).
292fn newest_message_ts(walk: &SubtreeWalk) -> Option<i64> {
293    let message_path = walk.message_files.last()?;
294    let message = read_json(message_path).ok()?;
295    let mut newest = millis_at(&message, &["time", "created"])?;
296    if let Some(parts) = walk.part_files_by_message.last() {
297        for part_path in parts {
298            if let Ok(part) = read_json(part_path)
299                && let Some(end) = millis_at(&part, &["state", "time", "end"])
300            {
301                newest = newest.max(end);
302            }
303        }
304    }
305    Some(newest.timestamp_micros())
306}
307
308fn read_sessions(
309    adapter: &OpencodeAdapter,
310    sessions: Vec<SessionFile>,
311    tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
312) {
313    for session in sessions {
314        if !read_one_session(adapter, session, tx) {
315            return;
316        }
317    }
318}
319
320/// Returns `false` when the consumer dropped the receiver and the read should stop.
321fn read_one_session(
322    adapter: &OpencodeAdapter,
323    file: SessionFile,
324    tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
325) -> bool {
326    macro_rules! emit {
327        ($item:expr) => {
328            if tx.blocking_send($item).is_err() {
329                return false;
330            }
331        };
332    }
333
334    let session_value = match read_json(&file.path) {
335        Ok(value) => value,
336        Err(error) => {
337            emit!(Err(error));
338            return true;
339        }
340    };
341    let session = match session_from_value(&session_value, &file.path) {
342        Ok(session) => session,
343        Err(error) => {
344            emit!(Err(error));
345            return true;
346        }
347    };
348    let session_id = session.id.clone();
349    if let Err(error) = validate_path_id(
350        NAME,
351        "session id",
352        &session_id,
353        file.path.display().to_string(),
354    ) {
355        emit!(Err(error));
356        return true;
357    }
358    let session_created_at = session.created_at;
359    emit!(Ok(AdapterYield::Event(IngestEvent::Session(session))));
360
361    // Reuse the freshness pre-walk's listings when present; otherwise list now.
362    let (message_files, mut part_files_by_message) = match file.cached_subtree {
363        Some(walk) => (walk.message_files, walk.part_files_by_message),
364        None => {
365            let message_dir = adapter.root.join("message").join(&session_id);
366            let files = match list_json_sorted(&message_dir) {
367                Ok(files) => files,
368                Err(error) => {
369                    emit!(Err(error));
370                    return true;
371                }
372            };
373            (files, Vec::new())
374        }
375    };
376    let use_cache = !part_files_by_message.is_empty();
377
378    for (index, message_path) in message_files.iter().enumerate() {
379        let message_value = match read_json(message_path) {
380            Ok(value) => value,
381            Err(error) => {
382                emit!(Err(error));
383                continue;
384            }
385        };
386        let Some(message_id) = message_value.get("id").and_then(Value::as_str) else {
387            emit!(Err(AdapterError::schema(
388                NAME,
389                message_path.display().to_string(),
390                "message file missing `id`",
391            )));
392            continue;
393        };
394        if let Err(error) = validate_path_id(
395            NAME,
396            "message id",
397            message_id,
398            message_path.display().to_string(),
399        ) {
400            emit!(Err(error));
401            continue;
402        }
403        let part_files = if use_cache {
404            std::mem::take(&mut part_files_by_message[index])
405        } else {
406            let part_dir = adapter.root.join("part").join(message_id);
407            match list_json_sorted(&part_dir) {
408                Ok(files) => files,
409                Err(error) => {
410                    emit!(Err(error));
411                    continue;
412                }
413            }
414        };
415        let mut parts = Vec::with_capacity(part_files.len());
416        for part_path in part_files {
417            match read_json(&part_path) {
418                Ok(value) => parts.push(value),
419                Err(error) => emit!(Err(error)),
420            }
421        }
422        match build_message_events(&session_id, &message_value, &parts, session_created_at) {
423            Ok(events) => {
424                for event in events {
425                    emit!(Ok(AdapterYield::Event(event)));
426                }
427            }
428            Err(error) => emit!(Err(error)),
429        }
430    }
431    true
432}
433
434/// Read one JSON file, bounding every string leaf at the seam cap
435/// (spec.md#adapter-bounded-values) before it leaves this module. One open +
436/// one metadata syscall on the handle - avoids the duplicate `std::fs::metadata`
437/// + `std::fs::read` pair on the ~1k-file real corpus.
438fn read_json(path: &Path) -> Result<Value, AdapterError> {
439    use std::io::Read;
440    let io = |source| AdapterError::io(NAME, path.display().to_string(), source);
441    let mut file = std::fs::File::open(path).map_err(io)?;
442    let len = file.metadata().map_err(io)?.len();
443    if len > RECORD_CAP as u64 {
444        return Err(AdapterError::schema(
445            NAME,
446            path.display().to_string(),
447            format!("json file exceeds adapter record cap: {len} bytes > {RECORD_CAP}"),
448        ));
449    }
450    let mut bytes = Vec::with_capacity(len as usize);
451    file.read_to_end(&mut bytes).map_err(io)?;
452    let mut value: Value = serde_json::from_slice(&bytes)
453        .map_err(|error| AdapterError::parse(NAME, path.display().to_string(), 1, error))?;
454    bound_value(&mut value);
455    Ok(value)
456}
457
458/// List `*.json` files in `dir`, sorted by filename (= creation order, ids are
459/// time-sortable). A missing dir is an empty list - a message can legitimately
460/// carry no parts.
461fn list_json_sorted(dir: &Path) -> Result<Vec<PathBuf>, AdapterError> {
462    let entries = match std::fs::read_dir(dir) {
463        Ok(entries) => entries,
464        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
465        Err(error) => return Err(AdapterError::io(NAME, dir.display().to_string(), error)),
466    };
467    let mut out = Vec::new();
468    for entry in entries {
469        let entry =
470            entry.map_err(|error| AdapterError::io(NAME, dir.display().to_string(), error))?;
471        let path = entry.path();
472        if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
473            out.push(path);
474        }
475    }
476    out.sort();
477    Ok(out)
478}
479
480fn session_from_value(value: &Value, path: &Path) -> Result<Session, AdapterError> {
481    let display = path.display().to_string();
482    let id = value
483        .get("id")
484        .and_then(Value::as_str)
485        .ok_or_else(|| AdapterError::schema(NAME, display.clone(), "session missing `id`"))?
486        .to_owned();
487    let created_at = millis_at(value, &["time", "created"]).ok_or_else(|| {
488        AdapterError::schema(NAME, display.clone(), "session missing `time.created`")
489    })?;
490    // spec.md#model-project-non-empty: opencode always records `directory` (the
491    // project cwd); its absence is a malformed session, not a default.
492    let project = extract_str(value, "directory")
493        .ok_or_else(|| AdapterError::schema(NAME, display, "session missing `directory`"))?;
494
495    let options = opencode_raw(value);
496
497    Ok(Session {
498        id,
499        // opencode sub-sessions (a Task spawn) carry `parentID`; a soft
500        // reference, present only when this session was spawned from another.
501        parent_session_id: value
502            .get("parentID")
503            .and_then(Value::as_str)
504            .map(ToOwned::to_owned),
505        parent_message_id: None,
506        source_agent: NAME.to_owned(),
507        created_at,
508        project,
509        options,
510    })
511}
512
513/// Build the ordered event stream for one message: the message, its parts in
514/// order, then any synthetic `Tool` messages (one per `tool` part) each
515/// followed by its `ToolResult`.
516fn build_message_events(
517    session_id: &str,
518    message_value: &Value,
519    part_values: &[Value],
520    default_timestamp: DateTime<Utc>,
521) -> Result<Vec<IngestEvent>, AdapterError> {
522    let message_id = message_value
523        .get("id")
524        .and_then(Value::as_str)
525        .ok_or_else(|| AdapterError::schema(NAME, session_id.to_owned(), "message missing `id`"))?;
526    let role = message_value.get("role").and_then(Value::as_str);
527    let timestamp = millis_at(message_value, &["time", "created"]).unwrap_or(default_timestamp);
528
529    let options = opencode_raw(message_value);
530    let message = match role {
531        Some("user") => Message::User {
532            id: message_id.to_owned(),
533            session_id: session_id.to_owned(),
534            timestamp,
535            options,
536        },
537        Some("assistant") => Message::Assistant {
538            id: message_id.to_owned(),
539            session_id: session_id.to_owned(),
540            timestamp,
541            options,
542        },
543        // opencode v2 carries non-conversational message roles (synthetic,
544        // shell, compaction); keep them as System carriers rather than drop
545        // them (spec.md#adapter-integrity-no-silent-drops). The raw record
546        // survives in options; the role label is the content.
547        _ => Message::System {
548            id: message_id.to_owned(),
549            session_id: session_id.to_owned(),
550            timestamp,
551            content: extract_str(message_value, "role"),
552            options,
553        },
554    };
555
556    let mut events = vec![IngestEvent::Message(message)];
557    let mut deferred = Vec::new();
558    for (ordinal, part_value) in part_values.iter().enumerate() {
559        let mapped = map_part(session_id, message_id, ordinal, part_value, timestamp)?;
560        events.push(IngestEvent::Part(mapped.part));
561        if let Some(split) = mapped.tool_split {
562            deferred.push(split);
563        }
564    }
565    for ToolSplit {
566        message: tool_message,
567        result,
568    } in deferred
569    {
570        events.push(IngestEvent::Message(tool_message));
571        events.push(IngestEvent::Part(result));
572    }
573    Ok(events)
574}
575
576/// One mapped source part: the canonical Part it becomes, plus - for a fused
577/// `tool` part - the synthetic `Tool` message and `ToolResult` it splits off.
578struct MappedPart {
579    part: Part,
580    tool_split: Option<ToolSplit>,
581}
582
583struct ToolSplit {
584    message: Message,
585    result: Part,
586}
587
588fn map_part(
589    session_id: &str,
590    message_id: &str,
591    ordinal: usize,
592    value: &Value,
593    message_ts: DateTime<Utc>,
594) -> Result<MappedPart, AdapterError> {
595    let kind = value.get("type").and_then(Value::as_str);
596    let id = value
597        .get("id")
598        .and_then(Value::as_str)
599        .ok_or_else(|| AdapterError::schema(NAME, message_id.to_owned(), "part missing `id`"))?
600        .to_owned();
601
602    if kind == Some("tool") {
603        return Ok(tool_part(
604            session_id, message_id, &id, ordinal, value, message_ts,
605        ));
606    }
607
608    let (provenance, part_kind) = match kind {
609        Some("text") => (text_provenance(value), text_kind(value)),
610        Some("reasoning") => (Provenance::Conversational, reasoning_kind(value)),
611        Some("file") => (Provenance::Conversational, file_kind(value)),
612        // patch / step-start / step-finish (and any other marker) are
613        // harness-produced turn machinery, not conversation: keep them as
614        // injected Parts whose `raw_record` round-trips the source file.
615        _ => (Provenance::Injected, PartKind::Text { text: None }),
616    };
617
618    Ok(MappedPart {
619        part: Part {
620            session_id: session_id.to_owned(),
621            id,
622            message_id: message_id.to_owned(),
623            ordinal: part_ordinal(ordinal),
624            provenance,
625            options: opencode_raw(value),
626            kind: part_kind,
627        },
628        tool_split: None,
629    })
630}
631
632/// spec.md#model-part-provenance: opencode marks harness-injected text parts
633/// (the `Called the <tool> tool ...` echo, auto-expanded `@file` content) with
634/// `synthetic: true`; a genuine prompt or model reply is `synthetic: false`.
635fn text_provenance(value: &Value) -> Provenance {
636    if value.get("synthetic").and_then(Value::as_bool) == Some(true) {
637        Provenance::Injected
638    } else {
639        Provenance::Conversational
640    }
641}
642
643fn text_kind(value: &Value) -> PartKind {
644    PartKind::Text {
645        text: extract_str(value, "text"),
646    }
647}
648
649fn reasoning_kind(value: &Value) -> PartKind {
650    PartKind::Reasoning {
651        text: extract_str(value, "text"),
652    }
653}
654
655fn file_kind(value: &Value) -> PartKind {
656    // spec.md#model-no-synthesis: an absent mime hint is faithfully `None`,
657    // not a synthesized `application/octet-stream` placeholder.
658    let media_type = value
659        .get("mime")
660        .and_then(Value::as_str)
661        .map(ToOwned::to_owned);
662    let file_name = value
663        .get("filename")
664        .and_then(Value::as_str)
665        .map(ToOwned::to_owned);
666    let data = match value.get("url").and_then(Value::as_str) {
667        Some(url) => FileData::Url(url.to_owned()),
668        None => FileData::String(compact_json(value)),
669    };
670    PartKind::File {
671        media_type,
672        file_name,
673        data,
674    }
675}
676
677/// Split one opencode `tool` part (call + result fused) into a `ToolCall` on the
678/// owning assistant message and a synthetic `Tool` message carrying the
679/// `ToolResult`. The `ToolCall` keeps the source part's id and stores its full
680/// `raw_record`, so native restore reproduces the single source file; the
681/// synthetic records carry no `raw_record` and are skipped on restore.
682fn tool_part(
683    session_id: &str,
684    message_id: &str,
685    id: &str,
686    ordinal: usize,
687    value: &Value,
688    message_ts: DateTime<Utc>,
689) -> MappedPart {
690    let call_id = extract_str(value, "callID");
691    let name = extract_str(value, "tool");
692    let state = value.get("state");
693    let status = state.and_then(|s| s.get("status")).and_then(Value::as_str);
694    let result_ts = millis_at(value, &["state", "time", "end"]).unwrap_or(message_ts);
695
696    // Take input/output by moving them out of a single owned `state` clone
697    // rather than cloning each field separately - on the real corpus a fused
698    // tool part fans into three records (call + tool message + result), each
699    // of which used to clone its own slice of `state`.
700    let mut owned_state = state.cloned().unwrap_or(Value::Null);
701    let (input, result) = match owned_state.as_object_mut() {
702        Some(map) => {
703            let input = map.remove("input").unwrap_or(Value::Null);
704            let result = map
705                .remove("output")
706                .or_else(|| map.remove("error"))
707                .unwrap_or_else(|| {
708                    // No output/error - the rest of `state` IS the payload.
709                    std::mem::take(&mut owned_state)
710                });
711            (input, result)
712        }
713        None => (Value::Null, Value::Null),
714    };
715
716    let tool_call = Part {
717        session_id: session_id.to_owned(),
718        id: id.to_owned(),
719        message_id: message_id.to_owned(),
720        ordinal: part_ordinal(ordinal),
721        // spec.md#model-part-provenance: the model authored the tool call.
722        provenance: Provenance::Conversational,
723        options: opencode_raw(value),
724        kind: PartKind::ToolCall {
725            call_id: call_id.clone(),
726            name: name.clone(),
727            params: input,
728            provider_executed: false,
729        },
730    };
731
732    let tool_message_id = format!("{id}/result");
733    let tool_message = Message::Tool {
734        id: tool_message_id.clone(),
735        session_id: session_id.to_owned(),
736        timestamp: result_ts,
737        options: synthetic_options(),
738    };
739    let result_part = Part {
740        session_id: session_id.to_owned(),
741        id: part_id(&tool_message_id, 0),
742        message_id: tool_message_id,
743        ordinal: 0,
744        // spec.md#model-part-provenance: tool output is runtime-produced.
745        provenance: Provenance::Injected,
746        options: synthetic_options(),
747        kind: PartKind::ToolResult {
748            call_id,
749            name,
750            is_failure: status == Some("error"),
751            result,
752        },
753    };
754
755    MappedPart {
756        part: tool_call,
757        tool_split: Some(ToolSplit {
758            message: tool_message,
759            result: result_part,
760        }),
761    }
762}
763
764#[inline]
765fn opencode_raw(value: &Value) -> ProviderOptions {
766    source_options(NAME, value)
767}
768
769/// Marks a canonical record the adapter synthesized (the `Tool` message and
770/// `ToolResult` split off a fused `tool` part). Native restore skips records so
771/// marked - they correspond to no source file.
772fn synthetic_options() -> ProviderOptions {
773    let mut options = ProviderOptions::new();
774    options.insert("opencode".to_owned(), json!({ "synthetic": true }));
775    options
776}
777
778fn millis_at(value: &Value, path: &[&str]) -> Option<DateTime<Utc>> {
779    let mut cursor = value;
780    for key in path {
781        cursor = cursor.get(key)?;
782    }
783    DateTime::from_timestamp_millis(cursor.as_i64()?)
784}
785
786fn is_synthetic(options: &ProviderOptions) -> bool {
787    options
788        .get("opencode")
789        .and_then(|o| o.get("synthetic"))
790        .and_then(Value::as_bool)
791        == Some(true)
792}
793
794fn serialize_native(
795    session: &crate::sessions::SessionWithMessages,
796) -> Result<Vec<RestoredFile>, AdapterError> {
797    // Native replays each stored `raw_record` at its original split path; the
798    // synthetic `Tool` message and `ToolResult` (which carry no `raw_record`)
799    // are skipped, re-fusing into the single source `tool` part. Replay echoes
800    // a frozen snapshot - safe only while canonical is append-only
801    // (spec.md#adapter-integrity-additive-sync).
802    //
803    // spec.md#adapter-native-restore-lossless: when the session lacks a stored
804    // `raw_record` (older ingest, foreign-sourced session), native is
805    // impossible. We downgrade to foreign and stamp `actual_fidelity` so the
806    // caller can surface the downgrade instead of getting a silent surprise.
807    let Some(session_raw) = raw_record(&session.session.options) else {
808        return serialize_foreign(session);
809    };
810    let project_id = session_raw
811        .get("projectID")
812        .and_then(Value::as_str)
813        .ok_or_else(|| {
814            AdapterError::schema(
815                NAME,
816                session.session.id.clone(),
817                "stored session raw_record missing projectID",
818            )
819        })?;
820
821    let mut files = vec![RestoredFile::new(
822        PathBuf::from("session")
823            .join(project_id)
824            .join(format!("{}.json", session.session.id)),
825        encode(&session_raw, &session.session.id)?,
826        RestoreFidelity::Native,
827    )];
828
829    for message in &session.messages {
830        if !is_synthetic(message.message.options())
831            && let Some(raw) = raw_record(message.message.options())
832        {
833            files.push(RestoredFile::new(
834                PathBuf::from("message")
835                    .join(&session.session.id)
836                    .join(format!("{}.json", message.message.id())),
837                encode(&raw, message.message.id())?,
838                RestoreFidelity::Native,
839            ));
840        }
841        for part in &message.parts {
842            // A part that carries a `raw_record` maps 1:1 to a source file at
843            // `part/<message_id>/<part_id>.json`; synthetic split parts do not.
844            if let Some(raw) = raw_record(&part.options) {
845                files.push(RestoredFile::new(
846                    PathBuf::from("part")
847                        .join(&part.message_id)
848                        .join(format!("{}.json", part.id)),
849                    encode(&raw, &part.id)?,
850                    RestoreFidelity::Native,
851                ));
852            }
853        }
854    }
855    Ok(files)
856}
857
858fn serialize_foreign(
859    session: &crate::sessions::SessionWithMessages,
860) -> Result<Vec<RestoredFile>, AdapterError> {
861    // Foreign restore: a best-effort, idiomatic opencode tree. A non-opencode
862    // session has no `projectID` hash, so derive a stable directory key from
863    // the project path; tool results (canonical `Tool` messages) and System
864    // carriers have no idiomatic home in opencode's part model and are dropped
865    // (spec.md#adapter-native-restore-lossless, foreign clause).
866    let project_id = encode_project(&session.session.project);
867    let created = session.session.created_at.timestamp_millis();
868    let session_record = json!({
869        "id": session.session.id,
870        "projectID": project_id,
871        "directory": &*session.session.project,
872        "time": { "created": created, "updated": created },
873    });
874    let mut files = vec![RestoredFile::new(
875        PathBuf::from("session")
876            .join(&project_id)
877            .join(format!("{}.json", session.session.id)),
878        encode(&session_record, &session.session.id)?,
879        RestoreFidelity::Foreign,
880    )];
881
882    for message in &session.messages {
883        let role = match message.message {
884            Message::User { .. } => "user",
885            Message::Assistant { .. } => "assistant",
886            // No idiomatic opencode home; content stays in canonical.
887            Message::Tool { .. } | Message::System { .. } => continue,
888        };
889        let created = message.message.timestamp().timestamp_millis();
890        let record = json!({
891            "id": message.message.id(),
892            "sessionID": session.session.id,
893            "role": role,
894            "time": { "created": created },
895        });
896        files.push(RestoredFile::new(
897            PathBuf::from("message")
898                .join(&session.session.id)
899                .join(format!("{}.json", message.message.id())),
900            encode(&record, message.message.id())?,
901            RestoreFidelity::Foreign,
902        ));
903        for part in &message.parts {
904            let Some(record) = foreign_part(&session.session.id, part) else {
905                continue;
906            };
907            files.push(RestoredFile::new(
908                PathBuf::from("part")
909                    .join(message.message.id())
910                    .join(format!("{}.json", part.id)),
911                encode(&record, &part.id)?,
912                RestoreFidelity::Foreign,
913            ));
914        }
915    }
916    Ok(files)
917}
918
919fn foreign_part(session_id: &str, part: &Part) -> Option<Value> {
920    let mut record = match &part.kind {
921        PartKind::Text { text } => json!({
922            "type": "text",
923            "text": text.as_deref().map(|t| &**t),
924            "synthetic": part.provenance == Provenance::Injected,
925        }),
926        PartKind::Reasoning { text } => json!({
927            "type": "reasoning",
928            "text": text.as_deref().map(|t| &**t),
929        }),
930        PartKind::File {
931            media_type,
932            file_name,
933            data,
934        } => json!({
935            "type": "file",
936            "mime": media_type,
937            "filename": file_name,
938            "url": match data {
939                FileData::Url(url) => Some(url.clone()),
940                _ => None,
941            },
942        }),
943        PartKind::ToolCall {
944            call_id,
945            name,
946            params,
947            ..
948        } => json!({
949            "type": "tool",
950            "callID": call_id.as_deref().map(|c| &**c),
951            "tool": name.as_deref().map(|n| &**n),
952            "state": { "status": "completed", "input": params },
953        }),
954        // ToolResult / approval parts have no standalone opencode shape.
955        _ => return None,
956    };
957    if let Value::Object(map) = &mut record {
958        map.insert("id".to_owned(), json!(part.id));
959        map.insert("sessionID".to_owned(), json!(session_id));
960        map.insert("messageID".to_owned(), json!(part.message_id));
961    }
962    Some(record)
963}
964
965fn encode(value: &Value, location: &str) -> Result<Vec<u8>, AdapterError> {
966    serde_json::to_vec(value).map_err(|error| {
967        AdapterError::schema(
968            NAME,
969            location.to_owned(),
970            format!("json encode failed: {error}"),
971        )
972    })
973}
974
975fn encode_project(project: &str) -> String {
976    project
977        .chars()
978        .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
979        .collect()
980}
981
982#[cfg(test)]
983mod tests {
984    //! End-to-end test for the opencode adapter: ingest the committed
985    //! split-file fixture corpus and assert pond's canonical shape comes out
986    //! the other side, including the fused-tool-part split. The fixture lives
987    //! under `tests/fixtures/adapter/opencode/storage/`.
988    #![allow(clippy::expect_used, clippy::unwrap_used)]
989
990    use super::*;
991    use crate::{
992        adapter::extract::LEAF_CAP, handlers::ingest_adapter, sessions::Store, wire::PartKind,
993    };
994    use tempfile::TempDir;
995
996    // Manifest-dir anchored: unit tests must not depend on the process cwd
997    // (figment::Jail chdirs the whole test process while config tests run).
998    const FIXTURES: &str = concat!(
999        env!("CARGO_MANIFEST_DIR"),
1000        "/tests/fixtures/adapter/opencode/storage"
1001    );
1002    const FRESH_SESSION_ID: &str = "ses_6405e5a5cffeIG2QHRuTmm4mA7";
1003    const FRESH_MESSAGE_ID: &str = "msg_zzzzfresh0001";
1004    const FRESH_PART_ID: &str = "prt_zzzzfresh0001";
1005
1006    struct FixedOracle {
1007        session_id: &'static str,
1008        watermark_micros: i64,
1009    }
1010
1011    impl crate::adapter::SkipOracle for FixedOracle {
1012        fn session_max_ts(&self, session_id: &str) -> Option<i64> {
1013            (session_id == self.session_id).then_some(self.watermark_micros)
1014        }
1015    }
1016
1017    #[test]
1018    fn probe_default_finds_opencode_storage_under_home() -> anyhow::Result<()> {
1019        crate::adapter::test_support::assert_probe_default(
1020            &OpencodeFactory,
1021            &[".local", "share", "opencode", "storage"],
1022        )
1023    }
1024
1025    #[tokio::test(flavor = "multi_thread")]
1026    async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
1027        let adapter = OpencodeAdapter::new(FIXTURES);
1028        crate::adapter::test_support::assert_native_restore(
1029            &OpencodeFactory,
1030            &adapter,
1031            // opencode relative paths are rooted at the `storage/` dir.
1032            std::path::Path::new(FIXTURES),
1033        )
1034        .await
1035    }
1036
1037    /// `append_fresh_opencode_turn` writes its message at this `time.created`
1038    /// (millis); the freshness gate keys on it in micros.
1039    const FRESH_TURN_MICROS: i64 = 1_759_859_999_000 * 1_000;
1040
1041    /// A session whose latest message is newer than the watermark is re-read, and
1042    /// the appended turn lands.
1043    #[tokio::test(flavor = "multi_thread")]
1044    async fn freshness_re_reads_a_session_that_gained_a_newer_message() -> anyhow::Result<()> {
1045        let temp = TempDir::new()?;
1046        let source = temp.path().join("storage");
1047        copy_dir(std::path::Path::new(FIXTURES), &source)?;
1048
1049        let store = Store::open_local(temp.path().join("store")).await?;
1050        let adapter = OpencodeAdapter::new(&source);
1051        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1052
1053        append_fresh_opencode_turn(&source)?;
1054        // Watermark sits just below the appended message's timestamp.
1055        let oracle = FixedOracle {
1056            session_id: FRESH_SESSION_ID,
1057            watermark_micros: FRESH_TURN_MICROS - 1,
1058        };
1059        ingest_adapter(&store, &adapter, &oracle, |_| {}).await?;
1060
1061        let session = store
1062            .get_session(FRESH_SESSION_ID)
1063            .await?
1064            .expect("fixture session round-trips");
1065        let fresh = session
1066            .messages
1067            .iter()
1068            .find(|stored| stored.message.id() == FRESH_MESSAGE_ID)
1069            .expect("message newer than the watermark must land");
1070        assert!(
1071            fresh.parts.iter().any(|part| matches!(
1072                &part.kind,
1073                PartKind::Text { text } if text.as_deref().map(|value| value.as_str()) == Some("fresh opencode text")
1074            )),
1075            "fresh message part must land with the re-read session",
1076        );
1077        Ok(())
1078    }
1079
1080    /// A session whose latest message is no newer than the watermark is skipped as
1081    /// `Fresh` - the appended turn is NOT re-read.
1082    #[tokio::test(flavor = "multi_thread")]
1083    async fn freshness_skips_a_session_not_newer_than_the_watermark() -> anyhow::Result<()> {
1084        let temp = TempDir::new()?;
1085        let source = temp.path().join("storage");
1086        copy_dir(std::path::Path::new(FIXTURES), &source)?;
1087
1088        let store = Store::open_local(temp.path().join("store")).await?;
1089        let adapter = OpencodeAdapter::new(&source);
1090        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1091
1092        append_fresh_opencode_turn(&source)?;
1093        // Watermark at/above the appended timestamp: the session is fresh.
1094        let oracle = FixedOracle {
1095            session_id: FRESH_SESSION_ID,
1096            watermark_micros: FRESH_TURN_MICROS,
1097        };
1098        let summary = ingest_adapter(&store, &adapter, &oracle, |_| {}).await?;
1099
1100        assert!(
1101            summary.skipped_fresh >= 1,
1102            "the unchanged-vs-watermark session must be skipped, got {summary:?}",
1103        );
1104        let session = store
1105            .get_session(FRESH_SESSION_ID)
1106            .await?
1107            .expect("fixture session round-trips");
1108        assert!(
1109            !session
1110                .messages
1111                .iter()
1112                .any(|stored| stored.message.id() == FRESH_MESSAGE_ID),
1113            "a skipped session must not re-read the appended turn",
1114        );
1115        Ok(())
1116    }
1117
1118    #[tokio::test(flavor = "multi_thread")]
1119    async fn malformed_part_file_drops_only_that_part() -> anyhow::Result<()> {
1120        let temp = TempDir::new()?;
1121        let source = temp.path().join("storage");
1122        write_minimal_session(&source, "ses_badpart", "msg_badpart")?;
1123        let part_dir = source.join("part").join("msg_badpart");
1124        std::fs::write(part_dir.join("prt_000_bad.json"), b"{not json")?;
1125        write_json_file(
1126            &part_dir.join("prt_999_good.json"),
1127            &json!({
1128                "id": "prt_999_good",
1129                "sessionID": "ses_badpart",
1130                "messageID": "msg_badpart",
1131                "type": "text",
1132                "text": "valid sibling survives",
1133                "synthetic": false,
1134            }),
1135        )?;
1136
1137        let store = Store::open_local(temp.path().join("store")).await?;
1138        let summary = ingest_adapter(
1139            &store,
1140            &OpencodeAdapter::new(&source),
1141            &crate::adapter::NoopOracle,
1142            |_| {},
1143        )
1144        .await?;
1145
1146        assert_eq!(summary.dropped_events, 1);
1147        let session = store
1148            .get_session("ses_badpart")
1149            .await?
1150            .expect("session with one malformed part still lands");
1151        let message = session
1152            .messages
1153            .iter()
1154            .find(|stored| stored.message.id() == "msg_badpart")
1155            .expect("message with valid sibling part still lands");
1156        assert!(message.parts.iter().any(|part| {
1157            matches!(
1158                &part.kind,
1159                PartKind::Text { text }
1160                    if text.as_deref().map(String::as_str) == Some("valid sibling survives")
1161            )
1162        }));
1163        Ok(())
1164    }
1165
1166    #[test]
1167    fn missing_message_timestamp_uses_session_anchor() -> anyhow::Result<()> {
1168        let session_anchor =
1169            DateTime::parse_from_rfc3339("2026-05-05T12:13:14Z")?.with_timezone(&Utc);
1170        let events = build_message_events(
1171            "ses_anchor",
1172            &json!({"id": "msg_no_time", "role": "user"}),
1173            &[],
1174            session_anchor,
1175        )?;
1176
1177        let IngestEvent::Message(message) = &events[0] else {
1178            panic!("first event is the message");
1179        };
1180        assert_eq!(message.timestamp(), session_anchor);
1181        Ok(())
1182    }
1183
1184    #[test]
1185    fn source_part_without_id_is_schema_error() {
1186        let session_anchor = DateTime::from_timestamp_millis(1_765_000_000_000).unwrap();
1187        let error = build_message_events(
1188            "ses_missing_part_id",
1189            &json!({
1190                "id": "msg_missing_part_id",
1191                "role": "assistant",
1192                "time": { "created": 1_765_000_000_000i64 },
1193            }),
1194            &[json!({"type": "text", "text": "cannot restore its filename"})],
1195            session_anchor,
1196        )
1197        .expect_err("part ids are required for native filename replay");
1198
1199        assert!(error.to_string().contains("part missing `id`"));
1200    }
1201
1202    #[test]
1203    fn read_json_bounds_oversized_string_leaves() -> anyhow::Result<()> {
1204        let temp = TempDir::new()?;
1205        let path = temp.path().join("oversized.json");
1206        write_json_file(
1207            &path,
1208            &json!({
1209                "id": "oversized",
1210                "text": "x".repeat(LEAF_CAP + 100),
1211            }),
1212        )?;
1213
1214        let value = read_json(&path)?;
1215        let text = value
1216            .get("text")
1217            .and_then(Value::as_str)
1218            .expect("text leaf survives as a bounded marker");
1219        assert!(text.len() <= LEAF_CAP);
1220        assert!(text.ends_with(&format!("{} bytes>", LEAF_CAP + 100)));
1221        Ok(())
1222    }
1223
1224    #[tokio::test(flavor = "multi_thread")]
1225    async fn opencode_adapter_ingests_fixture_corpus_into_canonical_shape() -> anyhow::Result<()> {
1226        let temp = TempDir::new()?;
1227        let store = Store::open_local(temp.path()).await?;
1228        let adapter = OpencodeAdapter::new(FIXTURES);
1229
1230        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1231        assert!(summary.accepted() > 0, "ingest must accept rows");
1232        assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
1233        assert_eq!(
1234            summary.dropped_sessions, 0,
1235            "no session-level rejections expected"
1236        );
1237
1238        let (sessions, messages, parts) = store.row_counts().await?;
1239        assert!(sessions > 0, "at least one opencode session");
1240        assert!(messages > 0, "at least one opencode message");
1241        assert!(parts > 0, "at least one opencode Part");
1242
1243        let mut saw_call = false;
1244        let mut saw_result = false;
1245        let mut saw_injected_text = false;
1246        for session_id in store.session_ids().await? {
1247            let session = store
1248                .get_session(&session_id)
1249                .await?
1250                .expect("session round-trips");
1251            assert_eq!(session.session.source_agent, NAME);
1252            assert!(
1253                !(*session.session.project).is_empty(),
1254                "spec.md#model-project-non-empty",
1255            );
1256            for stored in &session.messages {
1257                for part in &stored.parts {
1258                    match &part.kind {
1259                        PartKind::ToolCall { .. } => saw_call = true,
1260                        PartKind::ToolResult { .. } => saw_result = true,
1261                        PartKind::Text { .. } if part.provenance == Provenance::Injected => {
1262                            saw_injected_text = true;
1263                        }
1264                        _ => {}
1265                    }
1266                }
1267            }
1268        }
1269        assert!(saw_call, "fused tool parts yield ToolCall on the assistant");
1270        assert!(
1271            saw_result,
1272            "fused tool parts split off a ToolResult on a Tool message",
1273        );
1274        assert!(
1275            saw_injected_text,
1276            "spec.md#model-part-provenance: synthetic text parts are injected",
1277        );
1278        Ok(())
1279    }
1280
1281    /// The synthetic `Tool` message a `tool` part splits off must carry a
1282    /// `Tool` role with one `ToolResult`, and must NOT collide with or
1283    /// overwrite the assistant message that owns the `ToolCall`.
1284    #[tokio::test(flavor = "multi_thread")]
1285    async fn fused_tool_part_splits_into_call_and_result() -> anyhow::Result<()> {
1286        let temp = TempDir::new()?;
1287        let store = Store::open_local(temp.path()).await?;
1288        let adapter = OpencodeAdapter::new(FIXTURES);
1289        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1290
1291        let mut call_ids = std::collections::HashSet::new();
1292        let mut result_ids = std::collections::HashSet::new();
1293        let mut saw_failure = false;
1294        for session_id in store.session_ids().await? {
1295            let session = store
1296                .get_session(&session_id)
1297                .await?
1298                .expect("session round-trips");
1299            for stored in &session.messages {
1300                for part in &stored.parts {
1301                    match &part.kind {
1302                        PartKind::ToolCall { call_id, .. } => {
1303                            if let Some(id) = call_id.as_deref() {
1304                                call_ids.insert(id.clone());
1305                            }
1306                        }
1307                        PartKind::ToolResult {
1308                            call_id,
1309                            is_failure,
1310                            result,
1311                            ..
1312                        } => {
1313                            assert!(
1314                                matches!(stored.message, Message::Tool { .. }),
1315                                "a ToolResult must live on a Tool-role message",
1316                            );
1317                            if *is_failure {
1318                                saw_failure = true;
1319                                assert_ne!(
1320                                    result,
1321                                    &Value::Null,
1322                                    "failed tool results must carry the source error/output payload",
1323                                );
1324                            }
1325                            if let Some(id) = call_id.as_deref() {
1326                                result_ids.insert(id.clone());
1327                            }
1328                        }
1329                        _ => {}
1330                    }
1331                }
1332            }
1333        }
1334        assert!(!call_ids.is_empty(), "corpus has tool calls");
1335        assert_eq!(
1336            call_ids, result_ids,
1337            "every tool call's id is matched by its split-off result",
1338        );
1339        assert!(
1340            saw_failure,
1341            "fixture has at least one failed opencode tool result"
1342        );
1343        Ok(())
1344    }
1345
1346    #[tokio::test(flavor = "multi_thread")]
1347    async fn foreign_serialization_reparses_as_opencode() -> anyhow::Result<()> {
1348        let temp = TempDir::new()?;
1349        let origin_store = Store::open_local(temp.path().join("origin-store")).await?;
1350        let origin = crate::adapter::PiCodingAgentAdapter::new(concat!(
1351            env!("CARGO_MANIFEST_DIR"),
1352            "/tests/fixtures/adapter/pi-coding-agent/sessions"
1353        ));
1354        ingest_adapter(&origin_store, &origin, &crate::adapter::NoopOracle, |_| {}).await?;
1355        let session_id = origin_store
1356            .session_ids()
1357            .await?
1358            .into_iter()
1359            .next()
1360            .expect("pi fixture has sessions");
1361        let session = origin_store
1362            .get_session(&session_id)
1363            .await?
1364            .expect("fixture session is readable");
1365
1366        let restored_root = temp.path().join("opencode-storage");
1367        crate::adapter::write_restored_files(
1368            &restored_root,
1369            OpencodeFactory.serialize(&session, RestoreFidelity::Foreign)?,
1370        )?;
1371        let restored_store = Store::open_local(temp.path().join("restored-store")).await?;
1372        let summary = ingest_adapter(
1373            &restored_store,
1374            &OpencodeAdapter::new(&restored_root),
1375            &crate::adapter::NoopOracle,
1376            |_| {},
1377        )
1378        .await?;
1379
1380        assert!(summary.accepted() > 0);
1381        assert_eq!(summary.dropped_events, 0);
1382        Ok(())
1383    }
1384
1385    #[test]
1386    fn path_ids_reject_separators_and_traversal() {
1387        let where_ = "session/project/session.json";
1388        assert!(validate_path_id(NAME, "session id", "ses_safe", where_).is_ok());
1389        assert!(validate_path_id(NAME, "session id", "../ses", where_).is_err());
1390        assert!(validate_path_id(NAME, "session id", "/tmp/ses", where_).is_err());
1391        assert!(validate_path_id(NAME, "message id", "msg/a", where_).is_err());
1392        assert!(validate_path_id(NAME, "message id", "msg\\a", where_).is_err());
1393    }
1394
1395    fn append_fresh_opencode_turn(root: &std::path::Path) -> anyhow::Result<()> {
1396        let message_dir = root.join("message").join(FRESH_SESSION_ID);
1397        let part_dir = root.join("part").join(FRESH_MESSAGE_ID);
1398        std::fs::create_dir_all(&message_dir)?;
1399        std::fs::create_dir_all(&part_dir)?;
1400        std::fs::write(
1401            message_dir.join(format!("{FRESH_MESSAGE_ID}.json")),
1402            serde_json::to_vec(&json!({
1403                "id": FRESH_MESSAGE_ID,
1404                "sessionID": FRESH_SESSION_ID,
1405                "role": "user",
1406                "time": { "created": 1759859999000i64 }
1407            }))?,
1408        )?;
1409        std::fs::write(
1410            part_dir.join(format!("{FRESH_PART_ID}.json")),
1411            serde_json::to_vec(&json!({
1412                "id": FRESH_PART_ID,
1413                "sessionID": FRESH_SESSION_ID,
1414                "messageID": FRESH_MESSAGE_ID,
1415                "type": "text",
1416                "text": "fresh opencode text",
1417                "synthetic": false
1418            }))?,
1419        )?;
1420        Ok(())
1421    }
1422
1423    fn write_minimal_session(
1424        root: &std::path::Path,
1425        session_id: &str,
1426        message_id: &str,
1427    ) -> anyhow::Result<()> {
1428        write_json_file(
1429            &root
1430                .join("session")
1431                .join("project")
1432                .join(format!("{session_id}.json")),
1433            &json!({
1434                "id": session_id,
1435                "projectID": "project",
1436                "directory": "/tmp/project",
1437                "time": { "created": 1_765_000_000_000i64, "updated": 1_765_000_000_000i64 },
1438            }),
1439        )?;
1440        write_json_file(
1441            &root
1442                .join("message")
1443                .join(session_id)
1444                .join(format!("{message_id}.json")),
1445            &json!({
1446                "id": message_id,
1447                "sessionID": session_id,
1448                "role": "assistant",
1449                "time": { "created": 1_765_000_000_001i64 },
1450            }),
1451        )?;
1452        std::fs::create_dir_all(root.join("part").join(message_id))?;
1453        Ok(())
1454    }
1455
1456    fn write_json_file(path: &std::path::Path, value: &Value) -> anyhow::Result<()> {
1457        if let Some(parent) = path.parent() {
1458            std::fs::create_dir_all(parent)?;
1459        }
1460        std::fs::write(path, serde_json::to_vec(value)?)?;
1461        Ok(())
1462    }
1463
1464    fn copy_dir(from: &std::path::Path, to: &std::path::Path) -> anyhow::Result<()> {
1465        std::fs::create_dir_all(to)?;
1466        for entry in std::fs::read_dir(from)? {
1467            let entry = entry?;
1468            let source = entry.path();
1469            let target = to.join(entry.file_name());
1470            if entry.file_type()?.is_dir() {
1471                copy_dir(&source, &target)?;
1472            } else {
1473                std::fs::copy(&source, &target)?;
1474            }
1475        }
1476        Ok(())
1477    }
1478}