1use std::path::{Path, PathBuf};
24
25use async_stream::stream;
26use chrono::{DateTime, Utc};
27use serde_json::{Value, json};
28use tokio::sync::mpsc;
29
30use crate::{
31 sessions::IngestEvent,
32 wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
33};
34
35use super::{
36 Adapter, AdapterError, AdapterFactory, AdapterYield, AdapterYieldStream, DiscoverFuture, Env,
37 RestoreFidelity, RestoredFile, SkipOracle, SkipReason, compact_json, config_path,
38 extract::{bound_value, extract_str},
39 jsonl::RECORD_CAP,
40 part_id, part_ordinal, raw_record, source_options, validate_path_id,
41};
42
43const NAME: &str = "opencode";
44
45const CHANNEL_CAP: usize = 256;
48
49pub struct OpencodeFactory;
52
53impl AdapterFactory for OpencodeFactory {
54 fn name(&self) -> &'static str {
55 NAME
56 }
57
58 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
59 Ok(Box::new(OpencodeAdapter::new(config_path(NAME, config)?)))
60 }
61
62 fn probe_default(&self, env: &Env) -> Option<Value> {
63 let path = env
64 .home
65 .join(".local")
66 .join("share")
67 .join("opencode")
68 .join("storage");
69 path.exists().then(|| json!({ "path": path }))
70 }
71
72 fn serialize(
73 &self,
74 session: &crate::sessions::SessionWithMessages,
75 fidelity: RestoreFidelity,
76 ) -> Result<Vec<RestoredFile>, AdapterError> {
77 match fidelity {
78 RestoreFidelity::Native => serialize_native(session),
79 RestoreFidelity::Foreign => serialize_foreign(session),
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
86pub struct OpencodeAdapter {
87 root: PathBuf,
88}
89
90impl OpencodeAdapter {
91 pub fn new(root: impl Into<PathBuf>) -> Self {
92 Self { root: root.into() }
93 }
94}
95
96impl Adapter for OpencodeAdapter {
97 fn discover(&self) -> DiscoverFuture<'_> {
98 let root = self.root.clone();
99 Box::pin(async move {
100 tokio::task::spawn_blocking(move || {
101 collect_session_files(&root).map(|files| files.len())
102 })
103 .await
104 .map_err(join_error)?
105 })
106 }
107
108 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
109 let adapter = self.clone();
110 Box::pin(stream! {
111 let files = {
112 let root = adapter.root.clone();
113 tokio::task::spawn_blocking(move || collect_session_files(&root)).await
114 };
115 let files = match files {
116 Ok(Ok(files)) => files,
117 Ok(Err(error)) => { yield Err(error); return; }
118 Err(join) => { yield Err(join_error(join)); return; }
119 };
120
121 let mut survivors = Vec::with_capacity(files.len());
125 for mut file in files {
126 if let Some(ingested) = oracle.last_ingested_at(&file.session_id) {
127 let walk = {
128 let root = adapter.root.clone();
129 let session_path = file.path.clone();
130 let session_id = file.session_id.clone();
131 tokio::task::spawn_blocking(move || {
132 walk_session_subtree(&root, &session_path, &session_id)
133 })
134 .await
135 };
136 let walk = match walk {
137 Ok(Ok(walk)) => walk,
138 Ok(Err(error)) => { yield Err(error); return; }
139 Err(join) => { yield Err(join_error(join)); return; }
140 };
141 if let Some(mtime) = walk.newest_mtime
142 && mtime <= ingested
143 {
144 yield Ok(AdapterYield::Skipped {
145 session_id: Some(file.session_id.clone()),
146 project: None,
147 reason: SkipReason::Fresh,
148 });
149 continue;
150 }
151 file.cached_subtree = Some(walk);
154 }
155 survivors.push(file);
156 }
157
158 let (tx, mut rx) = mpsc::channel(CHANNEL_CAP);
159 let reader = adapter.clone();
160 let handle = tokio::task::spawn_blocking(move || read_sessions(&reader, survivors, &tx));
161 while let Some(item) = rx.recv().await {
162 yield item;
163 }
164 if let Err(join) = handle.await {
165 yield Err(join_error(join));
166 }
167 })
168 }
169}
170
171fn join_error(join: tokio::task::JoinError) -> AdapterError {
174 AdapterError::io(
175 NAME,
176 "blocking read task",
177 std::io::Error::other(join.to_string()),
178 )
179}
180
181struct SessionFile {
185 session_id: String,
186 path: PathBuf,
187 cached_subtree: Option<SubtreeWalk>,
188}
189
190struct SubtreeWalk {
194 newest_mtime: Option<DateTime<Utc>>,
195 message_files: Vec<PathBuf>,
196 part_files_by_message: Vec<Vec<PathBuf>>,
199}
200
201fn collect_session_files(root: &Path) -> Result<Vec<SessionFile>, AdapterError> {
204 let session_root = root.join("session");
205 let io = |path: &Path, source| AdapterError::io(NAME, path.display().to_string(), source);
206 let entries = match std::fs::read_dir(&session_root) {
207 Ok(entries) => entries,
208 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
209 Err(error) => return Err(io(&session_root, error)),
210 };
211 let mut out = Vec::new();
212 for project in entries {
213 let project = project.map_err(|error| io(&session_root, error))?;
214 if !project
215 .file_type()
216 .map_err(|error| io(&project.path(), error))?
217 .is_dir()
218 {
219 continue;
220 }
221 let project_dir = project.path();
222 for session in std::fs::read_dir(&project_dir).map_err(|error| io(&project_dir, error))? {
223 let session = session.map_err(|error| io(&project_dir, error))?;
224 let path = session.path();
225 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
226 continue;
227 }
228 let Some(session_id) = path
229 .file_stem()
230 .and_then(|s| s.to_str())
231 .map(ToOwned::to_owned)
232 else {
233 continue;
234 };
235 validate_path_id(
236 NAME,
237 "session file name",
238 &session_id,
239 path.display().to_string(),
240 )?;
241 out.push(SessionFile {
242 session_id,
243 path,
244 cached_subtree: None,
245 });
246 }
247 }
248 out.sort_by(|a, b| a.path.cmp(&b.path));
249 Ok(out)
250}
251
252fn walk_session_subtree(
256 root: &Path,
257 session_path: &Path,
258 session_id: &str,
259) -> Result<SubtreeWalk, AdapterError> {
260 let mut newest = json_mtime(session_path);
261 let message_dir = root.join("message").join(session_id);
262 let message_files = list_json_sorted(&message_dir)?;
263 let mut part_files_by_message = Vec::with_capacity(message_files.len());
264 for message_path in &message_files {
265 newest = newest.max(json_mtime(message_path));
266 let Some(message_id) = message_path.file_stem().and_then(|stem| stem.to_str()) else {
267 part_files_by_message.push(Vec::new());
268 continue;
269 };
270 validate_path_id(
271 NAME,
272 "message file name",
273 message_id,
274 message_path.display().to_string(),
275 )?;
276 let part_dir = root.join("part").join(message_id);
277 let parts = list_json_sorted(&part_dir)?;
278 for part_path in &parts {
279 newest = newest.max(json_mtime(part_path));
280 }
281 part_files_by_message.push(parts);
282 }
283 Ok(SubtreeWalk {
284 newest_mtime: newest,
285 message_files,
286 part_files_by_message,
287 })
288}
289
290fn json_mtime(path: &Path) -> Option<DateTime<Utc>> {
291 std::fs::metadata(path)
292 .and_then(|meta| meta.modified())
293 .ok()
294 .map(DateTime::<Utc>::from)
295}
296
297fn read_sessions(
298 adapter: &OpencodeAdapter,
299 sessions: Vec<SessionFile>,
300 tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
301) {
302 for session in sessions {
303 if !read_one_session(adapter, session, tx) {
304 return;
305 }
306 }
307}
308
309fn read_one_session(
311 adapter: &OpencodeAdapter,
312 file: SessionFile,
313 tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
314) -> bool {
315 macro_rules! emit {
316 ($item:expr) => {
317 if tx.blocking_send($item).is_err() {
318 return false;
319 }
320 };
321 }
322
323 let session_value = match read_json(&file.path) {
324 Ok(value) => value,
325 Err(error) => {
326 emit!(Err(error));
327 return true;
328 }
329 };
330 let session = match session_from_value(&session_value, &file.path) {
331 Ok(session) => session,
332 Err(error) => {
333 emit!(Err(error));
334 return true;
335 }
336 };
337 let session_id = session.id.clone();
338 if let Err(error) = validate_path_id(
339 NAME,
340 "session id",
341 &session_id,
342 file.path.display().to_string(),
343 ) {
344 emit!(Err(error));
345 return true;
346 }
347 let session_created_at = session.created_at;
348 emit!(Ok(AdapterYield::Event(IngestEvent::Session(session))));
349
350 let (message_files, mut part_files_by_message) = match file.cached_subtree {
352 Some(walk) => (walk.message_files, walk.part_files_by_message),
353 None => {
354 let message_dir = adapter.root.join("message").join(&session_id);
355 let files = match list_json_sorted(&message_dir) {
356 Ok(files) => files,
357 Err(error) => {
358 emit!(Err(error));
359 return true;
360 }
361 };
362 (files, Vec::new())
363 }
364 };
365 let use_cache = !part_files_by_message.is_empty();
366
367 for (index, message_path) in message_files.iter().enumerate() {
368 let message_value = match read_json(message_path) {
369 Ok(value) => value,
370 Err(error) => {
371 emit!(Err(error));
372 continue;
373 }
374 };
375 let Some(message_id) = message_value.get("id").and_then(Value::as_str) else {
376 emit!(Err(AdapterError::schema(
377 NAME,
378 message_path.display().to_string(),
379 "message file missing `id`",
380 )));
381 continue;
382 };
383 if let Err(error) = validate_path_id(
384 NAME,
385 "message id",
386 message_id,
387 message_path.display().to_string(),
388 ) {
389 emit!(Err(error));
390 continue;
391 }
392 let part_files = if use_cache {
393 std::mem::take(&mut part_files_by_message[index])
394 } else {
395 let part_dir = adapter.root.join("part").join(message_id);
396 match list_json_sorted(&part_dir) {
397 Ok(files) => files,
398 Err(error) => {
399 emit!(Err(error));
400 continue;
401 }
402 }
403 };
404 let mut parts = Vec::with_capacity(part_files.len());
405 for part_path in part_files {
406 match read_json(&part_path) {
407 Ok(value) => parts.push(value),
408 Err(error) => emit!(Err(error)),
409 }
410 }
411 match build_message_events(&session_id, &message_value, &parts, session_created_at) {
412 Ok(events) => {
413 for event in events {
414 emit!(Ok(AdapterYield::Event(event)));
415 }
416 }
417 Err(error) => emit!(Err(error)),
418 }
419 }
420 true
421}
422
423fn read_json(path: &Path) -> Result<Value, AdapterError> {
428 use std::io::Read;
429 let io = |source| AdapterError::io(NAME, path.display().to_string(), source);
430 let mut file = std::fs::File::open(path).map_err(io)?;
431 let len = file.metadata().map_err(io)?.len();
432 if len > RECORD_CAP as u64 {
433 return Err(AdapterError::schema(
434 NAME,
435 path.display().to_string(),
436 format!("json file exceeds adapter record cap: {len} bytes > {RECORD_CAP}"),
437 ));
438 }
439 let mut bytes = Vec::with_capacity(len as usize);
440 file.read_to_end(&mut bytes).map_err(io)?;
441 let mut value: Value = serde_json::from_slice(&bytes)
442 .map_err(|error| AdapterError::parse(NAME, path.display().to_string(), 1, error))?;
443 bound_value(&mut value);
444 Ok(value)
445}
446
447fn list_json_sorted(dir: &Path) -> Result<Vec<PathBuf>, AdapterError> {
451 let entries = match std::fs::read_dir(dir) {
452 Ok(entries) => entries,
453 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
454 Err(error) => return Err(AdapterError::io(NAME, dir.display().to_string(), error)),
455 };
456 let mut out = Vec::new();
457 for entry in entries {
458 let entry =
459 entry.map_err(|error| AdapterError::io(NAME, dir.display().to_string(), error))?;
460 let path = entry.path();
461 if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
462 out.push(path);
463 }
464 }
465 out.sort();
466 Ok(out)
467}
468
469fn session_from_value(value: &Value, path: &Path) -> Result<Session, AdapterError> {
470 let display = path.display().to_string();
471 let id = value
472 .get("id")
473 .and_then(Value::as_str)
474 .ok_or_else(|| AdapterError::schema(NAME, display.clone(), "session missing `id`"))?
475 .to_owned();
476 let created_at = millis_at(value, &["time", "created"]).ok_or_else(|| {
477 AdapterError::schema(NAME, display.clone(), "session missing `time.created`")
478 })?;
479 let project = extract_str(value, "directory")
482 .ok_or_else(|| AdapterError::schema(NAME, display, "session missing `directory`"))?;
483
484 let options = opencode_raw(value);
485
486 Ok(Session {
487 id,
488 parent_session_id: value
491 .get("parentID")
492 .and_then(Value::as_str)
493 .map(ToOwned::to_owned),
494 parent_message_id: None,
495 source_agent: NAME.to_owned(),
496 created_at,
497 project,
498 options,
499 })
500}
501
502fn build_message_events(
506 session_id: &str,
507 message_value: &Value,
508 part_values: &[Value],
509 default_timestamp: DateTime<Utc>,
510) -> Result<Vec<IngestEvent>, AdapterError> {
511 let message_id = message_value
512 .get("id")
513 .and_then(Value::as_str)
514 .ok_or_else(|| AdapterError::schema(NAME, session_id.to_owned(), "message missing `id`"))?;
515 let role = message_value.get("role").and_then(Value::as_str);
516 let timestamp = millis_at(message_value, &["time", "created"]).unwrap_or(default_timestamp);
517
518 let options = opencode_raw(message_value);
519 let message = match role {
520 Some("user") => Message::User {
521 id: message_id.to_owned(),
522 session_id: session_id.to_owned(),
523 timestamp,
524 options,
525 },
526 Some("assistant") => Message::Assistant {
527 id: message_id.to_owned(),
528 session_id: session_id.to_owned(),
529 timestamp,
530 options,
531 },
532 _ => Message::System {
537 id: message_id.to_owned(),
538 session_id: session_id.to_owned(),
539 timestamp,
540 content: extract_str(message_value, "role"),
541 options,
542 },
543 };
544
545 let mut events = vec![IngestEvent::Message(message)];
546 let mut deferred = Vec::new();
547 for (ordinal, part_value) in part_values.iter().enumerate() {
548 let mapped = map_part(session_id, message_id, ordinal, part_value, timestamp)?;
549 events.push(IngestEvent::Part(mapped.part));
550 if let Some(split) = mapped.tool_split {
551 deferred.push(split);
552 }
553 }
554 for ToolSplit {
555 message: tool_message,
556 result,
557 } in deferred
558 {
559 events.push(IngestEvent::Message(tool_message));
560 events.push(IngestEvent::Part(result));
561 }
562 Ok(events)
563}
564
565struct MappedPart {
568 part: Part,
569 tool_split: Option<ToolSplit>,
570}
571
572struct ToolSplit {
573 message: Message,
574 result: Part,
575}
576
577fn map_part(
578 session_id: &str,
579 message_id: &str,
580 ordinal: usize,
581 value: &Value,
582 message_ts: DateTime<Utc>,
583) -> Result<MappedPart, AdapterError> {
584 let kind = value.get("type").and_then(Value::as_str);
585 let id = value
586 .get("id")
587 .and_then(Value::as_str)
588 .ok_or_else(|| AdapterError::schema(NAME, message_id.to_owned(), "part missing `id`"))?
589 .to_owned();
590
591 if kind == Some("tool") {
592 return Ok(tool_part(
593 session_id, message_id, &id, ordinal, value, message_ts,
594 ));
595 }
596
597 let (provenance, part_kind) = match kind {
598 Some("text") => (text_provenance(value), text_kind(value)),
599 Some("reasoning") => (Provenance::Conversational, reasoning_kind(value)),
600 Some("file") => (Provenance::Conversational, file_kind(value)),
601 _ => (Provenance::Injected, PartKind::Text { text: None }),
605 };
606
607 Ok(MappedPart {
608 part: Part {
609 session_id: session_id.to_owned(),
610 id,
611 message_id: message_id.to_owned(),
612 ordinal: part_ordinal(ordinal),
613 provenance,
614 options: opencode_raw(value),
615 kind: part_kind,
616 },
617 tool_split: None,
618 })
619}
620
621fn text_provenance(value: &Value) -> Provenance {
625 if value.get("synthetic").and_then(Value::as_bool) == Some(true) {
626 Provenance::Injected
627 } else {
628 Provenance::Conversational
629 }
630}
631
632fn text_kind(value: &Value) -> PartKind {
633 PartKind::Text {
634 text: extract_str(value, "text"),
635 }
636}
637
638fn reasoning_kind(value: &Value) -> PartKind {
639 PartKind::Reasoning {
640 text: extract_str(value, "text"),
641 }
642}
643
644fn file_kind(value: &Value) -> PartKind {
645 let media_type = value
648 .get("mime")
649 .and_then(Value::as_str)
650 .map(ToOwned::to_owned);
651 let file_name = value
652 .get("filename")
653 .and_then(Value::as_str)
654 .map(ToOwned::to_owned);
655 let data = match value.get("url").and_then(Value::as_str) {
656 Some(url) => FileData::Url(url.to_owned()),
657 None => FileData::String(compact_json(value)),
658 };
659 PartKind::File {
660 media_type,
661 file_name,
662 data,
663 }
664}
665
666fn tool_part(
672 session_id: &str,
673 message_id: &str,
674 id: &str,
675 ordinal: usize,
676 value: &Value,
677 message_ts: DateTime<Utc>,
678) -> MappedPart {
679 let call_id = extract_str(value, "callID");
680 let name = extract_str(value, "tool");
681 let state = value.get("state");
682 let status = state.and_then(|s| s.get("status")).and_then(Value::as_str);
683 let result_ts = millis_at(value, &["state", "time", "end"]).unwrap_or(message_ts);
684
685 let mut owned_state = state.cloned().unwrap_or(Value::Null);
690 let (input, result) = match owned_state.as_object_mut() {
691 Some(map) => {
692 let input = map.remove("input").unwrap_or(Value::Null);
693 let result = map
694 .remove("output")
695 .or_else(|| map.remove("error"))
696 .unwrap_or_else(|| {
697 std::mem::take(&mut owned_state)
699 });
700 (input, result)
701 }
702 None => (Value::Null, Value::Null),
703 };
704
705 let tool_call = Part {
706 session_id: session_id.to_owned(),
707 id: id.to_owned(),
708 message_id: message_id.to_owned(),
709 ordinal: part_ordinal(ordinal),
710 provenance: Provenance::Conversational,
712 options: opencode_raw(value),
713 kind: PartKind::ToolCall {
714 call_id: call_id.clone(),
715 name: name.clone(),
716 params: input,
717 provider_executed: false,
718 },
719 };
720
721 let tool_message_id = format!("{id}/result");
722 let tool_message = Message::Tool {
723 id: tool_message_id.clone(),
724 session_id: session_id.to_owned(),
725 timestamp: result_ts,
726 options: synthetic_options(),
727 };
728 let result_part = Part {
729 session_id: session_id.to_owned(),
730 id: part_id(&tool_message_id, 0),
731 message_id: tool_message_id,
732 ordinal: 0,
733 provenance: Provenance::Injected,
735 options: synthetic_options(),
736 kind: PartKind::ToolResult {
737 call_id,
738 name,
739 is_failure: status == Some("error"),
740 result,
741 },
742 };
743
744 MappedPart {
745 part: tool_call,
746 tool_split: Some(ToolSplit {
747 message: tool_message,
748 result: result_part,
749 }),
750 }
751}
752
753#[inline]
754fn opencode_raw(value: &Value) -> ProviderOptions {
755 source_options(NAME, value)
756}
757
758fn synthetic_options() -> ProviderOptions {
762 let mut options = ProviderOptions::new();
763 options.insert("opencode".to_owned(), json!({ "synthetic": true }));
764 options
765}
766
767fn millis_at(value: &Value, path: &[&str]) -> Option<DateTime<Utc>> {
768 let mut cursor = value;
769 for key in path {
770 cursor = cursor.get(key)?;
771 }
772 DateTime::from_timestamp_millis(cursor.as_i64()?)
773}
774
775fn is_synthetic(options: &ProviderOptions) -> bool {
776 options
777 .get("opencode")
778 .and_then(|o| o.get("synthetic"))
779 .and_then(Value::as_bool)
780 == Some(true)
781}
782
783fn serialize_native(
784 session: &crate::sessions::SessionWithMessages,
785) -> Result<Vec<RestoredFile>, AdapterError> {
786 let Some(session_raw) = raw_record(&session.session.options) else {
797 return serialize_foreign(session);
798 };
799 let project_id = session_raw
800 .get("projectID")
801 .and_then(Value::as_str)
802 .ok_or_else(|| {
803 AdapterError::schema(
804 NAME,
805 session.session.id.clone(),
806 "stored session raw_record missing projectID",
807 )
808 })?;
809
810 let mut files = vec![RestoredFile::new(
811 PathBuf::from("session")
812 .join(project_id)
813 .join(format!("{}.json", session.session.id)),
814 encode(&session_raw, &session.session.id)?,
815 RestoreFidelity::Native,
816 )];
817
818 for message in &session.messages {
819 if !is_synthetic(message.message.options())
820 && let Some(raw) = raw_record(message.message.options())
821 {
822 files.push(RestoredFile::new(
823 PathBuf::from("message")
824 .join(&session.session.id)
825 .join(format!("{}.json", message.message.id())),
826 encode(&raw, message.message.id())?,
827 RestoreFidelity::Native,
828 ));
829 }
830 for part in &message.parts {
831 if let Some(raw) = raw_record(&part.options) {
834 files.push(RestoredFile::new(
835 PathBuf::from("part")
836 .join(&part.message_id)
837 .join(format!("{}.json", part.id)),
838 encode(&raw, &part.id)?,
839 RestoreFidelity::Native,
840 ));
841 }
842 }
843 }
844 Ok(files)
845}
846
847fn serialize_foreign(
848 session: &crate::sessions::SessionWithMessages,
849) -> Result<Vec<RestoredFile>, AdapterError> {
850 let project_id = encode_project(&session.session.project);
856 let created = session.session.created_at.timestamp_millis();
857 let session_record = json!({
858 "id": session.session.id,
859 "projectID": project_id,
860 "directory": &*session.session.project,
861 "time": { "created": created, "updated": created },
862 });
863 let mut files = vec![RestoredFile::new(
864 PathBuf::from("session")
865 .join(&project_id)
866 .join(format!("{}.json", session.session.id)),
867 encode(&session_record, &session.session.id)?,
868 RestoreFidelity::Foreign,
869 )];
870
871 for message in &session.messages {
872 let role = match message.message {
873 Message::User { .. } => "user",
874 Message::Assistant { .. } => "assistant",
875 Message::Tool { .. } | Message::System { .. } => continue,
877 };
878 let created = message.message.timestamp().timestamp_millis();
879 let record = json!({
880 "id": message.message.id(),
881 "sessionID": session.session.id,
882 "role": role,
883 "time": { "created": created },
884 });
885 files.push(RestoredFile::new(
886 PathBuf::from("message")
887 .join(&session.session.id)
888 .join(format!("{}.json", message.message.id())),
889 encode(&record, message.message.id())?,
890 RestoreFidelity::Foreign,
891 ));
892 for part in &message.parts {
893 let Some(record) = foreign_part(&session.session.id, part) else {
894 continue;
895 };
896 files.push(RestoredFile::new(
897 PathBuf::from("part")
898 .join(message.message.id())
899 .join(format!("{}.json", part.id)),
900 encode(&record, &part.id)?,
901 RestoreFidelity::Foreign,
902 ));
903 }
904 }
905 Ok(files)
906}
907
908fn foreign_part(session_id: &str, part: &Part) -> Option<Value> {
909 let mut record = match &part.kind {
910 PartKind::Text { text } => json!({
911 "type": "text",
912 "text": text.as_deref().map(|t| &**t),
913 "synthetic": part.provenance == Provenance::Injected,
914 }),
915 PartKind::Reasoning { text } => json!({
916 "type": "reasoning",
917 "text": text.as_deref().map(|t| &**t),
918 }),
919 PartKind::File {
920 media_type,
921 file_name,
922 data,
923 } => json!({
924 "type": "file",
925 "mime": media_type,
926 "filename": file_name,
927 "url": match data {
928 FileData::Url(url) => Some(url.clone()),
929 _ => None,
930 },
931 }),
932 PartKind::ToolCall {
933 call_id,
934 name,
935 params,
936 ..
937 } => json!({
938 "type": "tool",
939 "callID": call_id.as_deref().map(|c| &**c),
940 "tool": name.as_deref().map(|n| &**n),
941 "state": { "status": "completed", "input": params },
942 }),
943 _ => return None,
945 };
946 if let Value::Object(map) = &mut record {
947 map.insert("id".to_owned(), json!(part.id));
948 map.insert("sessionID".to_owned(), json!(session_id));
949 map.insert("messageID".to_owned(), json!(part.message_id));
950 }
951 Some(record)
952}
953
954fn encode(value: &Value, location: &str) -> Result<Vec<u8>, AdapterError> {
955 serde_json::to_vec(value).map_err(|error| {
956 AdapterError::schema(
957 NAME,
958 location.to_owned(),
959 format!("json encode failed: {error}"),
960 )
961 })
962}
963
964fn encode_project(project: &str) -> String {
965 project
966 .chars()
967 .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
968 .collect()
969}
970
971#[cfg(test)]
972mod tests {
973 #![allow(clippy::expect_used, clippy::unwrap_used)]
978
979 use super::*;
980 use crate::{
981 adapter::extract::LEAF_CAP, handlers::ingest_adapter, sessions::Store, wire::PartKind,
982 };
983 use tempfile::TempDir;
984
985 const FIXTURES: &str = "tests/fixtures/adapter/opencode/storage";
986 const FRESH_SESSION_ID: &str = "ses_6405e5a5cffeIG2QHRuTmm4mA7";
987 const FRESH_MESSAGE_ID: &str = "msg_zzzzfresh0001";
988 const FRESH_PART_ID: &str = "prt_zzzzfresh0001";
989
990 struct FixedOracle {
991 session_id: &'static str,
992 ingested_at: DateTime<Utc>,
993 }
994
995 impl crate::adapter::SkipOracle for FixedOracle {
996 fn last_ingested_at(&self, session_id: &str) -> Option<DateTime<Utc>> {
997 (session_id == self.session_id).then_some(self.ingested_at)
998 }
999 }
1000
1001 #[test]
1002 fn probe_default_finds_opencode_storage_under_home() -> anyhow::Result<()> {
1003 crate::adapter::test_support::assert_probe_default(
1004 &OpencodeFactory,
1005 &[".local", "share", "opencode", "storage"],
1006 )
1007 }
1008
1009 #[tokio::test(flavor = "multi_thread")]
1010 async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
1011 let adapter = OpencodeAdapter::new(FIXTURES);
1012 crate::adapter::test_support::assert_native_restore(
1013 &OpencodeFactory,
1014 &adapter,
1015 std::path::Path::new(FIXTURES),
1017 )
1018 .await
1019 }
1020
1021 #[tokio::test(flavor = "multi_thread")]
1022 async fn freshness_uses_message_and_part_file_mtimes() -> anyhow::Result<()> {
1023 let temp = TempDir::new()?;
1024 let source = temp.path().join("storage");
1025 copy_dir(std::path::Path::new(FIXTURES), &source)?;
1026
1027 let store_dir = temp.path().join("store");
1028 let store = Store::open_local(&store_dir).await?;
1029 let adapter = OpencodeAdapter::new(&source);
1030 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1031
1032 let watermark = Utc::now();
1033 std::thread::sleep(std::time::Duration::from_millis(1100));
1034 append_fresh_opencode_turn(&source)?;
1035
1036 let oracle = FixedOracle {
1037 session_id: FRESH_SESSION_ID,
1038 ingested_at: watermark,
1039 };
1040 ingest_adapter(&store, &adapter, &oracle, |_| {}).await?;
1041
1042 let session = store
1043 .get_session(FRESH_SESSION_ID)
1044 .await?
1045 .expect("fixture session round-trips");
1046 let fresh = session
1047 .messages
1048 .iter()
1049 .find(|stored| stored.message.id() == FRESH_MESSAGE_ID)
1050 .expect("message added after the session file mtime must land");
1051 assert!(
1052 fresh.parts.iter().any(|part| matches!(
1053 &part.kind,
1054 PartKind::Text { text } if text.as_deref().map(|value| value.as_str()) == Some("fresh opencode text")
1055 )),
1056 "fresh message part must land with the re-read session",
1057 );
1058 Ok(())
1059 }
1060
1061 #[tokio::test(flavor = "multi_thread")]
1062 async fn malformed_part_file_drops_only_that_part() -> anyhow::Result<()> {
1063 let temp = TempDir::new()?;
1064 let source = temp.path().join("storage");
1065 write_minimal_session(&source, "ses_badpart", "msg_badpart")?;
1066 let part_dir = source.join("part").join("msg_badpart");
1067 std::fs::write(part_dir.join("prt_000_bad.json"), b"{not json")?;
1068 write_json_file(
1069 &part_dir.join("prt_999_good.json"),
1070 &json!({
1071 "id": "prt_999_good",
1072 "sessionID": "ses_badpart",
1073 "messageID": "msg_badpart",
1074 "type": "text",
1075 "text": "valid sibling survives",
1076 "synthetic": false,
1077 }),
1078 )?;
1079
1080 let store = Store::open_local(temp.path().join("store")).await?;
1081 let summary = ingest_adapter(
1082 &store,
1083 &OpencodeAdapter::new(&source),
1084 &crate::adapter::NoopOracle,
1085 |_| {},
1086 )
1087 .await?;
1088
1089 assert_eq!(summary.dropped_events, 1);
1090 let session = store
1091 .get_session("ses_badpart")
1092 .await?
1093 .expect("session with one malformed part still lands");
1094 let message = session
1095 .messages
1096 .iter()
1097 .find(|stored| stored.message.id() == "msg_badpart")
1098 .expect("message with valid sibling part still lands");
1099 assert!(message.parts.iter().any(|part| {
1100 matches!(
1101 &part.kind,
1102 PartKind::Text { text }
1103 if text.as_deref().map(String::as_str) == Some("valid sibling survives")
1104 )
1105 }));
1106 Ok(())
1107 }
1108
1109 #[test]
1110 fn missing_message_timestamp_uses_session_anchor() -> anyhow::Result<()> {
1111 let session_anchor =
1112 DateTime::parse_from_rfc3339("2026-05-05T12:13:14Z")?.with_timezone(&Utc);
1113 let events = build_message_events(
1114 "ses_anchor",
1115 &json!({"id": "msg_no_time", "role": "user"}),
1116 &[],
1117 session_anchor,
1118 )?;
1119
1120 let IngestEvent::Message(message) = &events[0] else {
1121 panic!("first event is the message");
1122 };
1123 assert_eq!(message.timestamp(), session_anchor);
1124 Ok(())
1125 }
1126
1127 #[test]
1128 fn source_part_without_id_is_schema_error() {
1129 let session_anchor = DateTime::from_timestamp_millis(1_765_000_000_000).unwrap();
1130 let error = build_message_events(
1131 "ses_missing_part_id",
1132 &json!({
1133 "id": "msg_missing_part_id",
1134 "role": "assistant",
1135 "time": { "created": 1_765_000_000_000i64 },
1136 }),
1137 &[json!({"type": "text", "text": "cannot restore its filename"})],
1138 session_anchor,
1139 )
1140 .expect_err("part ids are required for native filename replay");
1141
1142 assert!(error.to_string().contains("part missing `id`"));
1143 }
1144
1145 #[test]
1146 fn read_json_bounds_oversized_string_leaves() -> anyhow::Result<()> {
1147 let temp = TempDir::new()?;
1148 let path = temp.path().join("oversized.json");
1149 write_json_file(
1150 &path,
1151 &json!({
1152 "id": "oversized",
1153 "text": "x".repeat(LEAF_CAP + 100),
1154 }),
1155 )?;
1156
1157 let value = read_json(&path)?;
1158 let text = value
1159 .get("text")
1160 .and_then(Value::as_str)
1161 .expect("text leaf survives as a bounded marker");
1162 assert!(text.len() <= LEAF_CAP);
1163 assert!(text.ends_with(&format!("{} bytes>", LEAF_CAP + 100)));
1164 Ok(())
1165 }
1166
1167 #[tokio::test(flavor = "multi_thread")]
1168 async fn opencode_adapter_ingests_fixture_corpus_into_canonical_shape() -> anyhow::Result<()> {
1169 let temp = TempDir::new()?;
1170 let store = Store::open_local(temp.path()).await?;
1171 let adapter = OpencodeAdapter::new(FIXTURES);
1172
1173 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1174 assert!(summary.accepted() > 0, "ingest must accept rows");
1175 assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
1176 assert_eq!(
1177 summary.dropped_sessions, 0,
1178 "no session-level rejections expected"
1179 );
1180
1181 let (sessions, messages, parts) = store.row_counts().await?;
1182 assert!(sessions > 0, "at least one opencode session");
1183 assert!(messages > 0, "at least one opencode message");
1184 assert!(parts > 0, "at least one opencode Part");
1185
1186 let mut saw_call = false;
1187 let mut saw_result = false;
1188 let mut saw_injected_text = false;
1189 for session_id in store.session_ids().await? {
1190 let session = store
1191 .get_session(&session_id)
1192 .await?
1193 .expect("session round-trips");
1194 assert_eq!(session.session.source_agent, NAME);
1195 assert!(
1196 !(*session.session.project).is_empty(),
1197 "spec.md#model-project-non-empty",
1198 );
1199 for stored in &session.messages {
1200 for part in &stored.parts {
1201 match &part.kind {
1202 PartKind::ToolCall { .. } => saw_call = true,
1203 PartKind::ToolResult { .. } => saw_result = true,
1204 PartKind::Text { .. } if part.provenance == Provenance::Injected => {
1205 saw_injected_text = true;
1206 }
1207 _ => {}
1208 }
1209 }
1210 }
1211 }
1212 assert!(saw_call, "fused tool parts yield ToolCall on the assistant");
1213 assert!(
1214 saw_result,
1215 "fused tool parts split off a ToolResult on a Tool message",
1216 );
1217 assert!(
1218 saw_injected_text,
1219 "spec.md#model-part-provenance: synthetic text parts are injected",
1220 );
1221 Ok(())
1222 }
1223
1224 #[tokio::test(flavor = "multi_thread")]
1228 async fn fused_tool_part_splits_into_call_and_result() -> anyhow::Result<()> {
1229 let temp = TempDir::new()?;
1230 let store = Store::open_local(temp.path()).await?;
1231 let adapter = OpencodeAdapter::new(FIXTURES);
1232 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1233
1234 let mut call_ids = std::collections::HashSet::new();
1235 let mut result_ids = std::collections::HashSet::new();
1236 let mut saw_failure = false;
1237 for session_id in store.session_ids().await? {
1238 let session = store
1239 .get_session(&session_id)
1240 .await?
1241 .expect("session round-trips");
1242 for stored in &session.messages {
1243 for part in &stored.parts {
1244 match &part.kind {
1245 PartKind::ToolCall { call_id, .. } => {
1246 if let Some(id) = call_id.as_deref() {
1247 call_ids.insert(id.clone());
1248 }
1249 }
1250 PartKind::ToolResult {
1251 call_id,
1252 is_failure,
1253 result,
1254 ..
1255 } => {
1256 assert!(
1257 matches!(stored.message, Message::Tool { .. }),
1258 "a ToolResult must live on a Tool-role message",
1259 );
1260 if *is_failure {
1261 saw_failure = true;
1262 assert_ne!(
1263 result,
1264 &Value::Null,
1265 "failed tool results must carry the source error/output payload",
1266 );
1267 }
1268 if let Some(id) = call_id.as_deref() {
1269 result_ids.insert(id.clone());
1270 }
1271 }
1272 _ => {}
1273 }
1274 }
1275 }
1276 }
1277 assert!(!call_ids.is_empty(), "corpus has tool calls");
1278 assert_eq!(
1279 call_ids, result_ids,
1280 "every tool call's id is matched by its split-off result",
1281 );
1282 assert!(
1283 saw_failure,
1284 "fixture has at least one failed opencode tool result"
1285 );
1286 Ok(())
1287 }
1288
1289 #[tokio::test(flavor = "multi_thread")]
1290 async fn foreign_serialization_reparses_as_opencode() -> anyhow::Result<()> {
1291 let temp = TempDir::new()?;
1292 let origin_store = Store::open_local(temp.path().join("origin-store")).await?;
1293 let origin = crate::adapter::PiCodingAgentAdapter::new(
1294 "tests/fixtures/adapter/pi-coding-agent/sessions",
1295 );
1296 ingest_adapter(&origin_store, &origin, &crate::adapter::NoopOracle, |_| {}).await?;
1297 let session_id = origin_store
1298 .session_ids()
1299 .await?
1300 .into_iter()
1301 .next()
1302 .expect("pi fixture has sessions");
1303 let session = origin_store
1304 .get_session(&session_id)
1305 .await?
1306 .expect("fixture session is readable");
1307
1308 let restored_root = temp.path().join("opencode-storage");
1309 crate::adapter::write_restored_files(
1310 &restored_root,
1311 OpencodeFactory.serialize(&session, RestoreFidelity::Foreign)?,
1312 )?;
1313 let restored_store = Store::open_local(temp.path().join("restored-store")).await?;
1314 let summary = ingest_adapter(
1315 &restored_store,
1316 &OpencodeAdapter::new(&restored_root),
1317 &crate::adapter::NoopOracle,
1318 |_| {},
1319 )
1320 .await?;
1321
1322 assert!(summary.accepted() > 0);
1323 assert_eq!(summary.dropped_events, 0);
1324 Ok(())
1325 }
1326
1327 #[test]
1328 fn path_ids_reject_separators_and_traversal() {
1329 let where_ = "session/project/session.json";
1330 assert!(validate_path_id(NAME, "session id", "ses_safe", where_).is_ok());
1331 assert!(validate_path_id(NAME, "session id", "../ses", where_).is_err());
1332 assert!(validate_path_id(NAME, "session id", "/tmp/ses", where_).is_err());
1333 assert!(validate_path_id(NAME, "message id", "msg/a", where_).is_err());
1334 assert!(validate_path_id(NAME, "message id", "msg\\a", where_).is_err());
1335 }
1336
1337 fn append_fresh_opencode_turn(root: &std::path::Path) -> anyhow::Result<()> {
1338 let message_dir = root.join("message").join(FRESH_SESSION_ID);
1339 let part_dir = root.join("part").join(FRESH_MESSAGE_ID);
1340 std::fs::create_dir_all(&message_dir)?;
1341 std::fs::create_dir_all(&part_dir)?;
1342 std::fs::write(
1343 message_dir.join(format!("{FRESH_MESSAGE_ID}.json")),
1344 serde_json::to_vec(&json!({
1345 "id": FRESH_MESSAGE_ID,
1346 "sessionID": FRESH_SESSION_ID,
1347 "role": "user",
1348 "time": { "created": 1759859999000i64 }
1349 }))?,
1350 )?;
1351 std::fs::write(
1352 part_dir.join(format!("{FRESH_PART_ID}.json")),
1353 serde_json::to_vec(&json!({
1354 "id": FRESH_PART_ID,
1355 "sessionID": FRESH_SESSION_ID,
1356 "messageID": FRESH_MESSAGE_ID,
1357 "type": "text",
1358 "text": "fresh opencode text",
1359 "synthetic": false
1360 }))?,
1361 )?;
1362 Ok(())
1363 }
1364
1365 fn write_minimal_session(
1366 root: &std::path::Path,
1367 session_id: &str,
1368 message_id: &str,
1369 ) -> anyhow::Result<()> {
1370 write_json_file(
1371 &root
1372 .join("session")
1373 .join("project")
1374 .join(format!("{session_id}.json")),
1375 &json!({
1376 "id": session_id,
1377 "projectID": "project",
1378 "directory": "/tmp/project",
1379 "time": { "created": 1_765_000_000_000i64, "updated": 1_765_000_000_000i64 },
1380 }),
1381 )?;
1382 write_json_file(
1383 &root
1384 .join("message")
1385 .join(session_id)
1386 .join(format!("{message_id}.json")),
1387 &json!({
1388 "id": message_id,
1389 "sessionID": session_id,
1390 "role": "assistant",
1391 "time": { "created": 1_765_000_000_001i64 },
1392 }),
1393 )?;
1394 std::fs::create_dir_all(root.join("part").join(message_id))?;
1395 Ok(())
1396 }
1397
1398 fn write_json_file(path: &std::path::Path, value: &Value) -> anyhow::Result<()> {
1399 if let Some(parent) = path.parent() {
1400 std::fs::create_dir_all(parent)?;
1401 }
1402 std::fs::write(path, serde_json::to_vec(value)?)?;
1403 Ok(())
1404 }
1405
1406 fn copy_dir(from: &std::path::Path, to: &std::path::Path) -> anyhow::Result<()> {
1407 std::fs::create_dir_all(to)?;
1408 for entry in std::fs::read_dir(from)? {
1409 let entry = entry?;
1410 let source = entry.path();
1411 let target = to.join(entry.file_name());
1412 if entry.file_type()?.is_dir() {
1413 copy_dir(&source, &target)?;
1414 } else {
1415 std::fs::copy(&source, &target)?;
1416 }
1417 }
1418 Ok(())
1419 }
1420}