1use std::path::{Path, PathBuf};
24
25use async_stream::stream;
26use chrono::{DateTime, Utc};
27use serde_json::{Value, json};
28use tokio::sync::mpsc;
29
30use crate::{
31 sessions::IngestEvent,
32 wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
33};
34
35use super::{
36 Adapter, AdapterError, AdapterFactory, AdapterYield, AdapterYieldStream, DiscoverFuture, Env,
37 RestoreFidelity, RestoredFile, SkipOracle, SkipReason, compact_json, config_path,
38 extract::{bound_value, extract_str},
39 jsonl::RECORD_CAP,
40 part_id, part_ordinal, raw_record, source_options, validate_path_id,
41};
42
43const NAME: &str = "opencode";
44
45const CHANNEL_CAP: usize = 256;
48
49pub struct OpencodeFactory;
52
53impl AdapterFactory for OpencodeFactory {
54 fn name(&self) -> &'static str {
55 NAME
56 }
57
58 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
59 Ok(Box::new(OpencodeAdapter::new(config_path(NAME, config)?)))
60 }
61
62 fn probe_default(&self, env: &Env) -> Option<Value> {
63 let path = env
64 .home
65 .join(".local")
66 .join("share")
67 .join("opencode")
68 .join("storage");
69 path.exists().then(|| json!({ "path": path }))
70 }
71
72 fn serialize(
73 &self,
74 session: &crate::sessions::SessionWithMessages,
75 fidelity: RestoreFidelity,
76 ) -> Result<Vec<RestoredFile>, AdapterError> {
77 match fidelity {
78 RestoreFidelity::Native => serialize_native(session),
79 RestoreFidelity::Foreign => serialize_foreign(session),
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
86pub struct OpencodeAdapter {
87 root: PathBuf,
88}
89
90impl OpencodeAdapter {
91 pub fn new(root: impl Into<PathBuf>) -> Self {
92 Self { root: root.into() }
93 }
94}
95
96impl Adapter for OpencodeAdapter {
97 fn discover(&self) -> DiscoverFuture<'_> {
98 let root = self.root.clone();
99 Box::pin(async move {
100 tokio::task::spawn_blocking(move || {
101 collect_session_files(&root).map(|files| files.len())
102 })
103 .await
104 .map_err(join_error)?
105 })
106 }
107
108 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
109 let adapter = self.clone();
110 Box::pin(stream! {
111 let files = {
112 let root = adapter.root.clone();
113 tokio::task::spawn_blocking(move || collect_session_files(&root)).await
114 };
115 let files = match files {
116 Ok(Ok(files)) => files,
117 Ok(Err(error)) => { yield Err(error); return; }
118 Err(join) => { yield Err(join_error(join)); return; }
119 };
120
121 let mut survivors = Vec::with_capacity(files.len());
127 for mut file in files {
128 if !oracle.is_empty() {
129 let walked = {
130 let root = adapter.root.clone();
131 let session_path = file.path.clone();
132 let session_id = file.session_id.clone();
133 tokio::task::spawn_blocking(move || {
134 let walk = walk_session_subtree(&root, &session_path, &session_id)?;
135 let last_ts = newest_message_ts(&walk);
136 Ok::<_, AdapterError>((walk, last_ts))
137 })
138 .await
139 };
140 let (walk, last_ts) = match walked {
141 Ok(Ok(pair)) => pair,
142 Ok(Err(error)) => { yield Err(error); return; }
143 Err(join) => { yield Err(join_error(join)); return; }
144 };
145 if crate::adapter::is_session_fresh(oracle, &file.session_id, last_ts) {
146 yield Ok(AdapterYield::Skipped {
147 session_id: Some(file.session_id.clone()),
148 project: None,
149 reason: SkipReason::Fresh,
150 });
151 continue;
152 }
153 file.cached_subtree = Some(walk);
156 }
157 survivors.push(file);
158 }
159
160 let (tx, mut rx) = mpsc::channel(CHANNEL_CAP);
161 let reader = adapter.clone();
162 let handle = tokio::task::spawn_blocking(move || read_sessions(&reader, survivors, &tx));
163 while let Some(item) = rx.recv().await {
164 yield item;
165 }
166 if let Err(join) = handle.await {
167 yield Err(join_error(join));
168 }
169 })
170 }
171}
172
173fn join_error(join: tokio::task::JoinError) -> AdapterError {
176 AdapterError::io(
177 NAME,
178 "blocking read task",
179 std::io::Error::other(join.to_string()),
180 )
181}
182
183struct SessionFile {
187 session_id: String,
188 path: PathBuf,
189 cached_subtree: Option<SubtreeWalk>,
190}
191
192struct SubtreeWalk {
196 message_files: Vec<PathBuf>,
197 part_files_by_message: Vec<Vec<PathBuf>>,
200}
201
202fn collect_session_files(root: &Path) -> Result<Vec<SessionFile>, AdapterError> {
205 let session_root = root.join("session");
206 let io = |path: &Path, source| AdapterError::io(NAME, path.display().to_string(), source);
207 let entries = match std::fs::read_dir(&session_root) {
208 Ok(entries) => entries,
209 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
210 Err(error) => return Err(io(&session_root, error)),
211 };
212 let mut out = Vec::new();
213 for project in entries {
214 let project = project.map_err(|error| io(&session_root, error))?;
215 if !project
216 .file_type()
217 .map_err(|error| io(&project.path(), error))?
218 .is_dir()
219 {
220 continue;
221 }
222 let project_dir = project.path();
223 for session in std::fs::read_dir(&project_dir).map_err(|error| io(&project_dir, error))? {
224 let session = session.map_err(|error| io(&project_dir, error))?;
225 let path = session.path();
226 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
227 continue;
228 }
229 let Some(session_id) = path
230 .file_stem()
231 .and_then(|s| s.to_str())
232 .map(ToOwned::to_owned)
233 else {
234 continue;
235 };
236 validate_path_id(
237 NAME,
238 "session file name",
239 &session_id,
240 path.display().to_string(),
241 )?;
242 out.push(SessionFile {
243 session_id,
244 path,
245 cached_subtree: None,
246 });
247 }
248 }
249 out.sort_by(|a, b| a.path.cmp(&b.path));
250 Ok(out)
251}
252
253fn walk_session_subtree(
258 root: &Path,
259 _session_path: &Path,
260 session_id: &str,
261) -> Result<SubtreeWalk, AdapterError> {
262 let message_dir = root.join("message").join(session_id);
263 let message_files = list_json_sorted(&message_dir)?;
264 let mut part_files_by_message = Vec::with_capacity(message_files.len());
265 for message_path in &message_files {
266 let Some(message_id) = message_path.file_stem().and_then(|stem| stem.to_str()) else {
267 part_files_by_message.push(Vec::new());
268 continue;
269 };
270 validate_path_id(
271 NAME,
272 "message file name",
273 message_id,
274 message_path.display().to_string(),
275 )?;
276 let part_dir = root.join("part").join(message_id);
277 let parts = list_json_sorted(&part_dir)?;
278 part_files_by_message.push(parts);
279 }
280 Ok(SubtreeWalk {
281 message_files,
282 part_files_by_message,
283 })
284}
285
286fn newest_message_ts(walk: &SubtreeWalk) -> Option<i64> {
293 let message_path = walk.message_files.last()?;
294 let message = read_json(message_path).ok()?;
295 let mut newest = millis_at(&message, &["time", "created"])?;
296 if let Some(parts) = walk.part_files_by_message.last() {
297 for part_path in parts {
298 if let Ok(part) = read_json(part_path)
299 && let Some(end) = millis_at(&part, &["state", "time", "end"])
300 {
301 newest = newest.max(end);
302 }
303 }
304 }
305 Some(newest.timestamp_micros())
306}
307
308fn read_sessions(
309 adapter: &OpencodeAdapter,
310 sessions: Vec<SessionFile>,
311 tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
312) {
313 for session in sessions {
314 if !read_one_session(adapter, session, tx) {
315 return;
316 }
317 }
318}
319
320fn read_one_session(
322 adapter: &OpencodeAdapter,
323 file: SessionFile,
324 tx: &mpsc::Sender<Result<AdapterYield, AdapterError>>,
325) -> bool {
326 macro_rules! emit {
327 ($item:expr) => {
328 if tx.blocking_send($item).is_err() {
329 return false;
330 }
331 };
332 }
333
334 let session_value = match read_json(&file.path) {
335 Ok(value) => value,
336 Err(error) => {
337 emit!(Err(error));
338 return true;
339 }
340 };
341 let session = match session_from_value(&session_value, &file.path) {
342 Ok(session) => session,
343 Err(error) => {
344 emit!(Err(error));
345 return true;
346 }
347 };
348 let session_id = session.id.clone();
349 if let Err(error) = validate_path_id(
350 NAME,
351 "session id",
352 &session_id,
353 file.path.display().to_string(),
354 ) {
355 emit!(Err(error));
356 return true;
357 }
358 let session_created_at = session.created_at;
359 emit!(Ok(AdapterYield::Event(IngestEvent::Session(session))));
360
361 let (message_files, mut part_files_by_message) = match file.cached_subtree {
363 Some(walk) => (walk.message_files, walk.part_files_by_message),
364 None => {
365 let message_dir = adapter.root.join("message").join(&session_id);
366 let files = match list_json_sorted(&message_dir) {
367 Ok(files) => files,
368 Err(error) => {
369 emit!(Err(error));
370 return true;
371 }
372 };
373 (files, Vec::new())
374 }
375 };
376 let use_cache = !part_files_by_message.is_empty();
377
378 for (index, message_path) in message_files.iter().enumerate() {
379 let message_value = match read_json(message_path) {
380 Ok(value) => value,
381 Err(error) => {
382 emit!(Err(error));
383 continue;
384 }
385 };
386 let Some(message_id) = message_value.get("id").and_then(Value::as_str) else {
387 emit!(Err(AdapterError::schema(
388 NAME,
389 message_path.display().to_string(),
390 "message file missing `id`",
391 )));
392 continue;
393 };
394 if let Err(error) = validate_path_id(
395 NAME,
396 "message id",
397 message_id,
398 message_path.display().to_string(),
399 ) {
400 emit!(Err(error));
401 continue;
402 }
403 let part_files = if use_cache {
404 std::mem::take(&mut part_files_by_message[index])
405 } else {
406 let part_dir = adapter.root.join("part").join(message_id);
407 match list_json_sorted(&part_dir) {
408 Ok(files) => files,
409 Err(error) => {
410 emit!(Err(error));
411 continue;
412 }
413 }
414 };
415 let mut parts = Vec::with_capacity(part_files.len());
416 for part_path in part_files {
417 match read_json(&part_path) {
418 Ok(value) => parts.push(value),
419 Err(error) => emit!(Err(error)),
420 }
421 }
422 match build_message_events(&session_id, &message_value, &parts, session_created_at) {
423 Ok(events) => {
424 for event in events {
425 emit!(Ok(AdapterYield::Event(event)));
426 }
427 }
428 Err(error) => emit!(Err(error)),
429 }
430 }
431 true
432}
433
434fn read_json(path: &Path) -> Result<Value, AdapterError> {
439 use std::io::Read;
440 let io = |source| AdapterError::io(NAME, path.display().to_string(), source);
441 let mut file = std::fs::File::open(path).map_err(io)?;
442 let len = file.metadata().map_err(io)?.len();
443 if len > RECORD_CAP as u64 {
444 return Err(AdapterError::schema(
445 NAME,
446 path.display().to_string(),
447 format!("json file exceeds adapter record cap: {len} bytes > {RECORD_CAP}"),
448 ));
449 }
450 let mut bytes = Vec::with_capacity(len as usize);
451 file.read_to_end(&mut bytes).map_err(io)?;
452 let mut value: Value = serde_json::from_slice(&bytes)
453 .map_err(|error| AdapterError::parse(NAME, path.display().to_string(), 1, error))?;
454 bound_value(&mut value);
455 Ok(value)
456}
457
458fn list_json_sorted(dir: &Path) -> Result<Vec<PathBuf>, AdapterError> {
462 let entries = match std::fs::read_dir(dir) {
463 Ok(entries) => entries,
464 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
465 Err(error) => return Err(AdapterError::io(NAME, dir.display().to_string(), error)),
466 };
467 let mut out = Vec::new();
468 for entry in entries {
469 let entry =
470 entry.map_err(|error| AdapterError::io(NAME, dir.display().to_string(), error))?;
471 let path = entry.path();
472 if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
473 out.push(path);
474 }
475 }
476 out.sort();
477 Ok(out)
478}
479
480fn session_from_value(value: &Value, path: &Path) -> Result<Session, AdapterError> {
481 let display = path.display().to_string();
482 let id = value
483 .get("id")
484 .and_then(Value::as_str)
485 .ok_or_else(|| AdapterError::schema(NAME, display.clone(), "session missing `id`"))?
486 .to_owned();
487 let created_at = millis_at(value, &["time", "created"]).ok_or_else(|| {
488 AdapterError::schema(NAME, display.clone(), "session missing `time.created`")
489 })?;
490 let project = extract_str(value, "directory")
493 .ok_or_else(|| AdapterError::schema(NAME, display, "session missing `directory`"))?;
494
495 let options = opencode_raw(value);
496
497 Ok(Session {
498 id,
499 parent_session_id: value
502 .get("parentID")
503 .and_then(Value::as_str)
504 .map(ToOwned::to_owned),
505 parent_message_id: None,
506 source_agent: NAME.to_owned(),
507 created_at,
508 project,
509 options,
510 })
511}
512
513fn build_message_events(
517 session_id: &str,
518 message_value: &Value,
519 part_values: &[Value],
520 default_timestamp: DateTime<Utc>,
521) -> Result<Vec<IngestEvent>, AdapterError> {
522 let message_id = message_value
523 .get("id")
524 .and_then(Value::as_str)
525 .ok_or_else(|| AdapterError::schema(NAME, session_id.to_owned(), "message missing `id`"))?;
526 let role = message_value.get("role").and_then(Value::as_str);
527 let timestamp = millis_at(message_value, &["time", "created"]).unwrap_or(default_timestamp);
528
529 let options = opencode_raw(message_value);
530 let message = match role {
531 Some("user") => Message::User {
532 id: message_id.to_owned(),
533 session_id: session_id.to_owned(),
534 timestamp,
535 options,
536 },
537 Some("assistant") => Message::Assistant {
538 id: message_id.to_owned(),
539 session_id: session_id.to_owned(),
540 timestamp,
541 options,
542 },
543 _ => Message::System {
548 id: message_id.to_owned(),
549 session_id: session_id.to_owned(),
550 timestamp,
551 content: extract_str(message_value, "role"),
552 options,
553 },
554 };
555
556 let mut events = vec![IngestEvent::Message(message)];
557 let mut deferred = Vec::new();
558 for (ordinal, part_value) in part_values.iter().enumerate() {
559 let mapped = map_part(session_id, message_id, ordinal, part_value, timestamp)?;
560 events.push(IngestEvent::Part(mapped.part));
561 if let Some(split) = mapped.tool_split {
562 deferred.push(split);
563 }
564 }
565 for ToolSplit {
566 message: tool_message,
567 result,
568 } in deferred
569 {
570 events.push(IngestEvent::Message(tool_message));
571 events.push(IngestEvent::Part(result));
572 }
573 Ok(events)
574}
575
576struct MappedPart {
579 part: Part,
580 tool_split: Option<ToolSplit>,
581}
582
583struct ToolSplit {
584 message: Message,
585 result: Part,
586}
587
588fn map_part(
589 session_id: &str,
590 message_id: &str,
591 ordinal: usize,
592 value: &Value,
593 message_ts: DateTime<Utc>,
594) -> Result<MappedPart, AdapterError> {
595 let kind = value.get("type").and_then(Value::as_str);
596 let id = value
597 .get("id")
598 .and_then(Value::as_str)
599 .ok_or_else(|| AdapterError::schema(NAME, message_id.to_owned(), "part missing `id`"))?
600 .to_owned();
601
602 if kind == Some("tool") {
603 return Ok(tool_part(
604 session_id, message_id, &id, ordinal, value, message_ts,
605 ));
606 }
607
608 let (provenance, part_kind) = match kind {
609 Some("text") => (text_provenance(value), text_kind(value)),
610 Some("reasoning") => (Provenance::Conversational, reasoning_kind(value)),
611 Some("file") => (Provenance::Conversational, file_kind(value)),
612 _ => (Provenance::Injected, PartKind::Text { text: None }),
616 };
617
618 Ok(MappedPart {
619 part: Part {
620 session_id: session_id.to_owned(),
621 id,
622 message_id: message_id.to_owned(),
623 ordinal: part_ordinal(ordinal),
624 provenance,
625 options: opencode_raw(value),
626 kind: part_kind,
627 },
628 tool_split: None,
629 })
630}
631
632fn text_provenance(value: &Value) -> Provenance {
636 if value.get("synthetic").and_then(Value::as_bool) == Some(true) {
637 Provenance::Injected
638 } else {
639 Provenance::Conversational
640 }
641}
642
643fn text_kind(value: &Value) -> PartKind {
644 PartKind::Text {
645 text: extract_str(value, "text"),
646 }
647}
648
649fn reasoning_kind(value: &Value) -> PartKind {
650 PartKind::Reasoning {
651 text: extract_str(value, "text"),
652 }
653}
654
655fn file_kind(value: &Value) -> PartKind {
656 let media_type = value
659 .get("mime")
660 .and_then(Value::as_str)
661 .map(ToOwned::to_owned);
662 let file_name = value
663 .get("filename")
664 .and_then(Value::as_str)
665 .map(ToOwned::to_owned);
666 let data = match value.get("url").and_then(Value::as_str) {
667 Some(url) => FileData::Url(url.to_owned()),
668 None => FileData::String(compact_json(value)),
669 };
670 PartKind::File {
671 media_type,
672 file_name,
673 data,
674 }
675}
676
677fn tool_part(
683 session_id: &str,
684 message_id: &str,
685 id: &str,
686 ordinal: usize,
687 value: &Value,
688 message_ts: DateTime<Utc>,
689) -> MappedPart {
690 let call_id = extract_str(value, "callID");
691 let name = extract_str(value, "tool");
692 let state = value.get("state");
693 let status = state.and_then(|s| s.get("status")).and_then(Value::as_str);
694 let result_ts = millis_at(value, &["state", "time", "end"]).unwrap_or(message_ts);
695
696 let mut owned_state = state.cloned().unwrap_or(Value::Null);
701 let (input, result) = match owned_state.as_object_mut() {
702 Some(map) => {
703 let input = map.remove("input").unwrap_or(Value::Null);
704 let result = map
705 .remove("output")
706 .or_else(|| map.remove("error"))
707 .unwrap_or_else(|| {
708 std::mem::take(&mut owned_state)
710 });
711 (input, result)
712 }
713 None => (Value::Null, Value::Null),
714 };
715
716 let tool_call = Part {
717 session_id: session_id.to_owned(),
718 id: id.to_owned(),
719 message_id: message_id.to_owned(),
720 ordinal: part_ordinal(ordinal),
721 provenance: Provenance::Conversational,
723 options: opencode_raw(value),
724 kind: PartKind::ToolCall {
725 call_id: call_id.clone(),
726 name: name.clone(),
727 params: input,
728 provider_executed: false,
729 },
730 };
731
732 let tool_message_id = format!("{id}/result");
733 let tool_message = Message::Tool {
734 id: tool_message_id.clone(),
735 session_id: session_id.to_owned(),
736 timestamp: result_ts,
737 options: synthetic_options(),
738 };
739 let result_part = Part {
740 session_id: session_id.to_owned(),
741 id: part_id(&tool_message_id, 0),
742 message_id: tool_message_id,
743 ordinal: 0,
744 provenance: Provenance::Injected,
746 options: synthetic_options(),
747 kind: PartKind::ToolResult {
748 call_id,
749 name,
750 is_failure: status == Some("error"),
751 result,
752 },
753 };
754
755 MappedPart {
756 part: tool_call,
757 tool_split: Some(ToolSplit {
758 message: tool_message,
759 result: result_part,
760 }),
761 }
762}
763
764#[inline]
765fn opencode_raw(value: &Value) -> ProviderOptions {
766 source_options(NAME, value)
767}
768
769fn synthetic_options() -> ProviderOptions {
773 let mut options = ProviderOptions::new();
774 options.insert("opencode".to_owned(), json!({ "synthetic": true }));
775 options
776}
777
778fn millis_at(value: &Value, path: &[&str]) -> Option<DateTime<Utc>> {
779 let mut cursor = value;
780 for key in path {
781 cursor = cursor.get(key)?;
782 }
783 DateTime::from_timestamp_millis(cursor.as_i64()?)
784}
785
786fn is_synthetic(options: &ProviderOptions) -> bool {
787 options
788 .get("opencode")
789 .and_then(|o| o.get("synthetic"))
790 .and_then(Value::as_bool)
791 == Some(true)
792}
793
794fn serialize_native(
795 session: &crate::sessions::SessionWithMessages,
796) -> Result<Vec<RestoredFile>, AdapterError> {
797 let Some(session_raw) = raw_record(&session.session.options) else {
808 return serialize_foreign(session);
809 };
810 let project_id = session_raw
811 .get("projectID")
812 .and_then(Value::as_str)
813 .ok_or_else(|| {
814 AdapterError::schema(
815 NAME,
816 session.session.id.clone(),
817 "stored session raw_record missing projectID",
818 )
819 })?;
820
821 let mut files = vec![RestoredFile::new(
822 PathBuf::from("session")
823 .join(project_id)
824 .join(format!("{}.json", session.session.id)),
825 encode(&session_raw, &session.session.id)?,
826 RestoreFidelity::Native,
827 )];
828
829 for message in &session.messages {
830 if !is_synthetic(message.message.options())
831 && let Some(raw) = raw_record(message.message.options())
832 {
833 files.push(RestoredFile::new(
834 PathBuf::from("message")
835 .join(&session.session.id)
836 .join(format!("{}.json", message.message.id())),
837 encode(&raw, message.message.id())?,
838 RestoreFidelity::Native,
839 ));
840 }
841 for part in &message.parts {
842 if let Some(raw) = raw_record(&part.options) {
845 files.push(RestoredFile::new(
846 PathBuf::from("part")
847 .join(&part.message_id)
848 .join(format!("{}.json", part.id)),
849 encode(&raw, &part.id)?,
850 RestoreFidelity::Native,
851 ));
852 }
853 }
854 }
855 Ok(files)
856}
857
858fn serialize_foreign(
859 session: &crate::sessions::SessionWithMessages,
860) -> Result<Vec<RestoredFile>, AdapterError> {
861 let project_id = encode_project(&session.session.project);
867 let created = session.session.created_at.timestamp_millis();
868 let session_record = json!({
869 "id": session.session.id,
870 "projectID": project_id,
871 "directory": &*session.session.project,
872 "time": { "created": created, "updated": created },
873 });
874 let mut files = vec![RestoredFile::new(
875 PathBuf::from("session")
876 .join(&project_id)
877 .join(format!("{}.json", session.session.id)),
878 encode(&session_record, &session.session.id)?,
879 RestoreFidelity::Foreign,
880 )];
881
882 for message in &session.messages {
883 let role = match message.message {
884 Message::User { .. } => "user",
885 Message::Assistant { .. } => "assistant",
886 Message::Tool { .. } | Message::System { .. } => continue,
888 };
889 let created = message.message.timestamp().timestamp_millis();
890 let record = json!({
891 "id": message.message.id(),
892 "sessionID": session.session.id,
893 "role": role,
894 "time": { "created": created },
895 });
896 files.push(RestoredFile::new(
897 PathBuf::from("message")
898 .join(&session.session.id)
899 .join(format!("{}.json", message.message.id())),
900 encode(&record, message.message.id())?,
901 RestoreFidelity::Foreign,
902 ));
903 for part in &message.parts {
904 let Some(record) = foreign_part(&session.session.id, part) else {
905 continue;
906 };
907 files.push(RestoredFile::new(
908 PathBuf::from("part")
909 .join(message.message.id())
910 .join(format!("{}.json", part.id)),
911 encode(&record, &part.id)?,
912 RestoreFidelity::Foreign,
913 ));
914 }
915 }
916 Ok(files)
917}
918
919fn foreign_part(session_id: &str, part: &Part) -> Option<Value> {
920 let mut record = match &part.kind {
921 PartKind::Text { text } => json!({
922 "type": "text",
923 "text": text.as_deref().map(|t| &**t),
924 "synthetic": part.provenance == Provenance::Injected,
925 }),
926 PartKind::Reasoning { text } => json!({
927 "type": "reasoning",
928 "text": text.as_deref().map(|t| &**t),
929 }),
930 PartKind::File {
931 media_type,
932 file_name,
933 data,
934 } => json!({
935 "type": "file",
936 "mime": media_type,
937 "filename": file_name,
938 "url": match data {
939 FileData::Url(url) => Some(url.clone()),
940 _ => None,
941 },
942 }),
943 PartKind::ToolCall {
944 call_id,
945 name,
946 params,
947 ..
948 } => json!({
949 "type": "tool",
950 "callID": call_id.as_deref().map(|c| &**c),
951 "tool": name.as_deref().map(|n| &**n),
952 "state": { "status": "completed", "input": params },
953 }),
954 _ => return None,
956 };
957 if let Value::Object(map) = &mut record {
958 map.insert("id".to_owned(), json!(part.id));
959 map.insert("sessionID".to_owned(), json!(session_id));
960 map.insert("messageID".to_owned(), json!(part.message_id));
961 }
962 Some(record)
963}
964
965fn encode(value: &Value, location: &str) -> Result<Vec<u8>, AdapterError> {
966 serde_json::to_vec(value).map_err(|error| {
967 AdapterError::schema(
968 NAME,
969 location.to_owned(),
970 format!("json encode failed: {error}"),
971 )
972 })
973}
974
975fn encode_project(project: &str) -> String {
976 project
977 .chars()
978 .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
979 .collect()
980}
981
982#[cfg(test)]
983mod tests {
984 #![allow(clippy::expect_used, clippy::unwrap_used)]
989
990 use super::*;
991 use crate::{
992 adapter::extract::LEAF_CAP, handlers::ingest_adapter, sessions::Store, wire::PartKind,
993 };
994 use tempfile::TempDir;
995
996 const FIXTURES: &str = concat!(
999 env!("CARGO_MANIFEST_DIR"),
1000 "/tests/fixtures/adapter/opencode/storage"
1001 );
1002 const FRESH_SESSION_ID: &str = "ses_6405e5a5cffeIG2QHRuTmm4mA7";
1003 const FRESH_MESSAGE_ID: &str = "msg_zzzzfresh0001";
1004 const FRESH_PART_ID: &str = "prt_zzzzfresh0001";
1005
1006 struct FixedOracle {
1007 session_id: &'static str,
1008 watermark_micros: i64,
1009 }
1010
1011 impl crate::adapter::SkipOracle for FixedOracle {
1012 fn session_max_ts(&self, session_id: &str) -> Option<i64> {
1013 (session_id == self.session_id).then_some(self.watermark_micros)
1014 }
1015 }
1016
1017 #[test]
1018 fn probe_default_finds_opencode_storage_under_home() -> anyhow::Result<()> {
1019 crate::adapter::test_support::assert_probe_default(
1020 &OpencodeFactory,
1021 &[".local", "share", "opencode", "storage"],
1022 )
1023 }
1024
1025 #[tokio::test(flavor = "multi_thread")]
1026 async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
1027 let adapter = OpencodeAdapter::new(FIXTURES);
1028 crate::adapter::test_support::assert_native_restore(
1029 &OpencodeFactory,
1030 &adapter,
1031 std::path::Path::new(FIXTURES),
1033 )
1034 .await
1035 }
1036
1037 const FRESH_TURN_MICROS: i64 = 1_759_859_999_000 * 1_000;
1040
1041 #[tokio::test(flavor = "multi_thread")]
1044 async fn freshness_re_reads_a_session_that_gained_a_newer_message() -> anyhow::Result<()> {
1045 let temp = TempDir::new()?;
1046 let source = temp.path().join("storage");
1047 copy_dir(std::path::Path::new(FIXTURES), &source)?;
1048
1049 let store = Store::open_local(temp.path().join("store")).await?;
1050 let adapter = OpencodeAdapter::new(&source);
1051 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1052
1053 append_fresh_opencode_turn(&source)?;
1054 let oracle = FixedOracle {
1056 session_id: FRESH_SESSION_ID,
1057 watermark_micros: FRESH_TURN_MICROS - 1,
1058 };
1059 ingest_adapter(&store, &adapter, &oracle, |_| {}).await?;
1060
1061 let session = store
1062 .get_session(FRESH_SESSION_ID)
1063 .await?
1064 .expect("fixture session round-trips");
1065 let fresh = session
1066 .messages
1067 .iter()
1068 .find(|stored| stored.message.id() == FRESH_MESSAGE_ID)
1069 .expect("message newer than the watermark must land");
1070 assert!(
1071 fresh.parts.iter().any(|part| matches!(
1072 &part.kind,
1073 PartKind::Text { text } if text.as_deref().map(|value| value.as_str()) == Some("fresh opencode text")
1074 )),
1075 "fresh message part must land with the re-read session",
1076 );
1077 Ok(())
1078 }
1079
1080 #[tokio::test(flavor = "multi_thread")]
1083 async fn freshness_skips_a_session_not_newer_than_the_watermark() -> anyhow::Result<()> {
1084 let temp = TempDir::new()?;
1085 let source = temp.path().join("storage");
1086 copy_dir(std::path::Path::new(FIXTURES), &source)?;
1087
1088 let store = Store::open_local(temp.path().join("store")).await?;
1089 let adapter = OpencodeAdapter::new(&source);
1090 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1091
1092 append_fresh_opencode_turn(&source)?;
1093 let oracle = FixedOracle {
1095 session_id: FRESH_SESSION_ID,
1096 watermark_micros: FRESH_TURN_MICROS,
1097 };
1098 let summary = ingest_adapter(&store, &adapter, &oracle, |_| {}).await?;
1099
1100 assert!(
1101 summary.skipped_fresh >= 1,
1102 "the unchanged-vs-watermark session must be skipped, got {summary:?}",
1103 );
1104 let session = store
1105 .get_session(FRESH_SESSION_ID)
1106 .await?
1107 .expect("fixture session round-trips");
1108 assert!(
1109 !session
1110 .messages
1111 .iter()
1112 .any(|stored| stored.message.id() == FRESH_MESSAGE_ID),
1113 "a skipped session must not re-read the appended turn",
1114 );
1115 Ok(())
1116 }
1117
1118 #[tokio::test(flavor = "multi_thread")]
1119 async fn malformed_part_file_drops_only_that_part() -> anyhow::Result<()> {
1120 let temp = TempDir::new()?;
1121 let source = temp.path().join("storage");
1122 write_minimal_session(&source, "ses_badpart", "msg_badpart")?;
1123 let part_dir = source.join("part").join("msg_badpart");
1124 std::fs::write(part_dir.join("prt_000_bad.json"), b"{not json")?;
1125 write_json_file(
1126 &part_dir.join("prt_999_good.json"),
1127 &json!({
1128 "id": "prt_999_good",
1129 "sessionID": "ses_badpart",
1130 "messageID": "msg_badpart",
1131 "type": "text",
1132 "text": "valid sibling survives",
1133 "synthetic": false,
1134 }),
1135 )?;
1136
1137 let store = Store::open_local(temp.path().join("store")).await?;
1138 let summary = ingest_adapter(
1139 &store,
1140 &OpencodeAdapter::new(&source),
1141 &crate::adapter::NoopOracle,
1142 |_| {},
1143 )
1144 .await?;
1145
1146 assert_eq!(summary.dropped_events, 1);
1147 let session = store
1148 .get_session("ses_badpart")
1149 .await?
1150 .expect("session with one malformed part still lands");
1151 let message = session
1152 .messages
1153 .iter()
1154 .find(|stored| stored.message.id() == "msg_badpart")
1155 .expect("message with valid sibling part still lands");
1156 assert!(message.parts.iter().any(|part| {
1157 matches!(
1158 &part.kind,
1159 PartKind::Text { text }
1160 if text.as_deref().map(String::as_str) == Some("valid sibling survives")
1161 )
1162 }));
1163 Ok(())
1164 }
1165
1166 #[test]
1167 fn missing_message_timestamp_uses_session_anchor() -> anyhow::Result<()> {
1168 let session_anchor =
1169 DateTime::parse_from_rfc3339("2026-05-05T12:13:14Z")?.with_timezone(&Utc);
1170 let events = build_message_events(
1171 "ses_anchor",
1172 &json!({"id": "msg_no_time", "role": "user"}),
1173 &[],
1174 session_anchor,
1175 )?;
1176
1177 let IngestEvent::Message(message) = &events[0] else {
1178 panic!("first event is the message");
1179 };
1180 assert_eq!(message.timestamp(), session_anchor);
1181 Ok(())
1182 }
1183
1184 #[test]
1185 fn source_part_without_id_is_schema_error() {
1186 let session_anchor = DateTime::from_timestamp_millis(1_765_000_000_000).unwrap();
1187 let error = build_message_events(
1188 "ses_missing_part_id",
1189 &json!({
1190 "id": "msg_missing_part_id",
1191 "role": "assistant",
1192 "time": { "created": 1_765_000_000_000i64 },
1193 }),
1194 &[json!({"type": "text", "text": "cannot restore its filename"})],
1195 session_anchor,
1196 )
1197 .expect_err("part ids are required for native filename replay");
1198
1199 assert!(error.to_string().contains("part missing `id`"));
1200 }
1201
1202 #[test]
1203 fn read_json_bounds_oversized_string_leaves() -> anyhow::Result<()> {
1204 let temp = TempDir::new()?;
1205 let path = temp.path().join("oversized.json");
1206 write_json_file(
1207 &path,
1208 &json!({
1209 "id": "oversized",
1210 "text": "x".repeat(LEAF_CAP + 100),
1211 }),
1212 )?;
1213
1214 let value = read_json(&path)?;
1215 let text = value
1216 .get("text")
1217 .and_then(Value::as_str)
1218 .expect("text leaf survives as a bounded marker");
1219 assert!(text.len() <= LEAF_CAP);
1220 assert!(text.ends_with(&format!("{} bytes>", LEAF_CAP + 100)));
1221 Ok(())
1222 }
1223
1224 #[tokio::test(flavor = "multi_thread")]
1225 async fn opencode_adapter_ingests_fixture_corpus_into_canonical_shape() -> anyhow::Result<()> {
1226 let temp = TempDir::new()?;
1227 let store = Store::open_local(temp.path()).await?;
1228 let adapter = OpencodeAdapter::new(FIXTURES);
1229
1230 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1231 assert!(summary.accepted() > 0, "ingest must accept rows");
1232 assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
1233 assert_eq!(
1234 summary.dropped_sessions, 0,
1235 "no session-level rejections expected"
1236 );
1237
1238 let (sessions, messages, parts) = store.row_counts().await?;
1239 assert!(sessions > 0, "at least one opencode session");
1240 assert!(messages > 0, "at least one opencode message");
1241 assert!(parts > 0, "at least one opencode Part");
1242
1243 let mut saw_call = false;
1244 let mut saw_result = false;
1245 let mut saw_injected_text = false;
1246 for session_id in store.session_ids().await? {
1247 let session = store
1248 .get_session(&session_id)
1249 .await?
1250 .expect("session round-trips");
1251 assert_eq!(session.session.source_agent, NAME);
1252 assert!(
1253 !(*session.session.project).is_empty(),
1254 "spec.md#model-project-non-empty",
1255 );
1256 for stored in &session.messages {
1257 for part in &stored.parts {
1258 match &part.kind {
1259 PartKind::ToolCall { .. } => saw_call = true,
1260 PartKind::ToolResult { .. } => saw_result = true,
1261 PartKind::Text { .. } if part.provenance == Provenance::Injected => {
1262 saw_injected_text = true;
1263 }
1264 _ => {}
1265 }
1266 }
1267 }
1268 }
1269 assert!(saw_call, "fused tool parts yield ToolCall on the assistant");
1270 assert!(
1271 saw_result,
1272 "fused tool parts split off a ToolResult on a Tool message",
1273 );
1274 assert!(
1275 saw_injected_text,
1276 "spec.md#model-part-provenance: synthetic text parts are injected",
1277 );
1278 Ok(())
1279 }
1280
1281 #[tokio::test(flavor = "multi_thread")]
1285 async fn fused_tool_part_splits_into_call_and_result() -> anyhow::Result<()> {
1286 let temp = TempDir::new()?;
1287 let store = Store::open_local(temp.path()).await?;
1288 let adapter = OpencodeAdapter::new(FIXTURES);
1289 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1290
1291 let mut call_ids = std::collections::HashSet::new();
1292 let mut result_ids = std::collections::HashSet::new();
1293 let mut saw_failure = false;
1294 for session_id in store.session_ids().await? {
1295 let session = store
1296 .get_session(&session_id)
1297 .await?
1298 .expect("session round-trips");
1299 for stored in &session.messages {
1300 for part in &stored.parts {
1301 match &part.kind {
1302 PartKind::ToolCall { call_id, .. } => {
1303 if let Some(id) = call_id.as_deref() {
1304 call_ids.insert(id.clone());
1305 }
1306 }
1307 PartKind::ToolResult {
1308 call_id,
1309 is_failure,
1310 result,
1311 ..
1312 } => {
1313 assert!(
1314 matches!(stored.message, Message::Tool { .. }),
1315 "a ToolResult must live on a Tool-role message",
1316 );
1317 if *is_failure {
1318 saw_failure = true;
1319 assert_ne!(
1320 result,
1321 &Value::Null,
1322 "failed tool results must carry the source error/output payload",
1323 );
1324 }
1325 if let Some(id) = call_id.as_deref() {
1326 result_ids.insert(id.clone());
1327 }
1328 }
1329 _ => {}
1330 }
1331 }
1332 }
1333 }
1334 assert!(!call_ids.is_empty(), "corpus has tool calls");
1335 assert_eq!(
1336 call_ids, result_ids,
1337 "every tool call's id is matched by its split-off result",
1338 );
1339 assert!(
1340 saw_failure,
1341 "fixture has at least one failed opencode tool result"
1342 );
1343 Ok(())
1344 }
1345
1346 #[tokio::test(flavor = "multi_thread")]
1347 async fn foreign_serialization_reparses_as_opencode() -> anyhow::Result<()> {
1348 let temp = TempDir::new()?;
1349 let origin_store = Store::open_local(temp.path().join("origin-store")).await?;
1350 let origin = crate::adapter::PiCodingAgentAdapter::new(concat!(
1351 env!("CARGO_MANIFEST_DIR"),
1352 "/tests/fixtures/adapter/pi-coding-agent/sessions"
1353 ));
1354 ingest_adapter(&origin_store, &origin, &crate::adapter::NoopOracle, |_| {}).await?;
1355 let session_id = origin_store
1356 .session_ids()
1357 .await?
1358 .into_iter()
1359 .next()
1360 .expect("pi fixture has sessions");
1361 let session = origin_store
1362 .get_session(&session_id)
1363 .await?
1364 .expect("fixture session is readable");
1365
1366 let restored_root = temp.path().join("opencode-storage");
1367 crate::adapter::write_restored_files(
1368 &restored_root,
1369 OpencodeFactory.serialize(&session, RestoreFidelity::Foreign)?,
1370 )?;
1371 let restored_store = Store::open_local(temp.path().join("restored-store")).await?;
1372 let summary = ingest_adapter(
1373 &restored_store,
1374 &OpencodeAdapter::new(&restored_root),
1375 &crate::adapter::NoopOracle,
1376 |_| {},
1377 )
1378 .await?;
1379
1380 assert!(summary.accepted() > 0);
1381 assert_eq!(summary.dropped_events, 0);
1382 Ok(())
1383 }
1384
1385 #[test]
1386 fn path_ids_reject_separators_and_traversal() {
1387 let where_ = "session/project/session.json";
1388 assert!(validate_path_id(NAME, "session id", "ses_safe", where_).is_ok());
1389 assert!(validate_path_id(NAME, "session id", "../ses", where_).is_err());
1390 assert!(validate_path_id(NAME, "session id", "/tmp/ses", where_).is_err());
1391 assert!(validate_path_id(NAME, "message id", "msg/a", where_).is_err());
1392 assert!(validate_path_id(NAME, "message id", "msg\\a", where_).is_err());
1393 }
1394
1395 fn append_fresh_opencode_turn(root: &std::path::Path) -> anyhow::Result<()> {
1396 let message_dir = root.join("message").join(FRESH_SESSION_ID);
1397 let part_dir = root.join("part").join(FRESH_MESSAGE_ID);
1398 std::fs::create_dir_all(&message_dir)?;
1399 std::fs::create_dir_all(&part_dir)?;
1400 std::fs::write(
1401 message_dir.join(format!("{FRESH_MESSAGE_ID}.json")),
1402 serde_json::to_vec(&json!({
1403 "id": FRESH_MESSAGE_ID,
1404 "sessionID": FRESH_SESSION_ID,
1405 "role": "user",
1406 "time": { "created": 1759859999000i64 }
1407 }))?,
1408 )?;
1409 std::fs::write(
1410 part_dir.join(format!("{FRESH_PART_ID}.json")),
1411 serde_json::to_vec(&json!({
1412 "id": FRESH_PART_ID,
1413 "sessionID": FRESH_SESSION_ID,
1414 "messageID": FRESH_MESSAGE_ID,
1415 "type": "text",
1416 "text": "fresh opencode text",
1417 "synthetic": false
1418 }))?,
1419 )?;
1420 Ok(())
1421 }
1422
1423 fn write_minimal_session(
1424 root: &std::path::Path,
1425 session_id: &str,
1426 message_id: &str,
1427 ) -> anyhow::Result<()> {
1428 write_json_file(
1429 &root
1430 .join("session")
1431 .join("project")
1432 .join(format!("{session_id}.json")),
1433 &json!({
1434 "id": session_id,
1435 "projectID": "project",
1436 "directory": "/tmp/project",
1437 "time": { "created": 1_765_000_000_000i64, "updated": 1_765_000_000_000i64 },
1438 }),
1439 )?;
1440 write_json_file(
1441 &root
1442 .join("message")
1443 .join(session_id)
1444 .join(format!("{message_id}.json")),
1445 &json!({
1446 "id": message_id,
1447 "sessionID": session_id,
1448 "role": "assistant",
1449 "time": { "created": 1_765_000_000_001i64 },
1450 }),
1451 )?;
1452 std::fs::create_dir_all(root.join("part").join(message_id))?;
1453 Ok(())
1454 }
1455
1456 fn write_json_file(path: &std::path::Path, value: &Value) -> anyhow::Result<()> {
1457 if let Some(parent) = path.parent() {
1458 std::fs::create_dir_all(parent)?;
1459 }
1460 std::fs::write(path, serde_json::to_vec(value)?)?;
1461 Ok(())
1462 }
1463
1464 fn copy_dir(from: &std::path::Path, to: &std::path::Path) -> anyhow::Result<()> {
1465 std::fs::create_dir_all(to)?;
1466 for entry in std::fs::read_dir(from)? {
1467 let entry = entry?;
1468 let source = entry.path();
1469 let target = to.join(entry.file_name());
1470 if entry.file_type()?.is_dir() {
1471 copy_dir(&source, &target)?;
1472 } else {
1473 std::fs::copy(&source, &target)?;
1474 }
1475 }
1476 Ok(())
1477 }
1478}