1use std::collections::HashMap;
2use std::fs;
3use std::hash::{Hash, Hasher};
4use std::io::{BufRead, BufReader, Read};
5use std::path::PathBuf;
6use std::process::{Command, Stdio};
7
8use rusqlite::{Connection, OpenFlags};
9use serde_json::{Value, json};
10
11use crate::error::{Result, XurlError};
12use crate::model::{ProviderKind, ResolutionMeta, ResolvedThread, WriteRequest, WriteResult};
13use crate::provider::{
14 Provider, WriteEventSink, append_passthrough_args, append_passthrough_args_excluding,
15};
16
17#[derive(Debug, Clone)]
18pub struct OpencodeProvider {
19 root: PathBuf,
20}
21
22impl OpencodeProvider {
23 pub fn new(root: impl Into<PathBuf>) -> Self {
24 Self { root: root.into() }
25 }
26
27 fn db_path(&self) -> PathBuf {
28 self.root.join("opencode.db")
29 }
30
31 fn materialized_path(&self, session_id: &str) -> PathBuf {
32 let mut hasher = std::collections::hash_map::DefaultHasher::new();
33 self.root.hash(&mut hasher);
34 let root_key = format!("{:016x}", hasher.finish());
35
36 std::env::temp_dir()
37 .join("xurl-opencode")
38 .join(root_key)
39 .join(format!("{session_id}.jsonl"))
40 }
41
42 fn session_exists(
43 conn: &Connection,
44 session_id: &str,
45 ) -> std::result::Result<bool, rusqlite::Error> {
46 let mut stmt = conn.prepare("SELECT 1 FROM session WHERE id = ?1 LIMIT 1")?;
47 let mut rows = stmt.query([session_id])?;
48 Ok(rows.next()?.is_some())
49 }
50
51 fn fetch_messages(
52 conn: &Connection,
53 session_id: &str,
54 warnings: &mut Vec<String>,
55 ) -> std::result::Result<Vec<(String, Value)>, rusqlite::Error> {
56 let mut stmt = conn.prepare(
57 "SELECT id, data
58 FROM message
59 WHERE session_id = ?1
60 ORDER BY time_created ASC, id ASC",
61 )?;
62
63 let rows = stmt.query_map([session_id], |row| {
64 let id = row.get::<_, String>(0)?;
65 let data = row.get::<_, String>(1)?;
66 Ok((id, data))
67 })?;
68
69 let mut result = Vec::new();
70 for row in rows {
71 let (id, data) = row?;
72 match serde_json::from_str::<Value>(&data) {
73 Ok(value) => result.push((id, value)),
74 Err(err) => warnings.push(format!(
75 "skipped message id={id}: invalid json payload ({err})"
76 )),
77 }
78 }
79
80 Ok(result)
81 }
82
83 fn fetch_parts(
84 conn: &Connection,
85 session_id: &str,
86 warnings: &mut Vec<String>,
87 ) -> std::result::Result<HashMap<String, Vec<Value>>, rusqlite::Error> {
88 let mut stmt = conn.prepare(
89 "SELECT message_id, data
90 FROM part
91 WHERE session_id = ?1
92 ORDER BY time_created ASC, id ASC",
93 )?;
94
95 let rows = stmt.query_map([session_id], |row| {
96 let message_id = row.get::<_, String>(0)?;
97 let data = row.get::<_, String>(1)?;
98 Ok((message_id, data))
99 })?;
100
101 let mut result = HashMap::new();
102 for row in rows {
103 let (message_id, data) = row?;
104 match serde_json::from_str::<Value>(&data) {
105 Ok(value) => {
106 result
107 .entry(message_id)
108 .or_insert_with(Vec::new)
109 .push(value);
110 }
111 Err(err) => warnings.push(format!(
112 "skipped part for message_id={message_id}: invalid json payload ({err})"
113 )),
114 }
115 }
116
117 Ok(result)
118 }
119
120 fn render_jsonl(
121 session_id: &str,
122 messages: Vec<(String, Value)>,
123 mut parts: HashMap<String, Vec<Value>>,
124 ) -> String {
125 let mut lines = Vec::with_capacity(messages.len() + 1);
126 lines.push(json!({
127 "type": "session",
128 "sessionId": session_id,
129 }));
130
131 for (id, message) in messages {
132 lines.push(json!({
133 "type": "message",
134 "id": id,
135 "sessionId": session_id,
136 "message": message,
137 "parts": parts.remove(&id).unwrap_or_default(),
138 }));
139 }
140
141 let mut output = String::new();
142 for line in lines {
143 let encoded = serde_json::to_string(&line).expect("json serialization should succeed");
144 output.push_str(&encoded);
145 output.push('\n');
146 }
147 output
148 }
149
150 fn opencode_bin() -> String {
151 std::env::var("XURL_OPENCODE_BIN").unwrap_or_else(|_| "opencode".to_string())
152 }
153
154 fn spawn_opencode_command(args: &[String]) -> Result<std::process::Child> {
155 let bin = Self::opencode_bin();
156 let mut command = Command::new(&bin);
157 command
158 .args(args)
159 .stdin(Stdio::null())
160 .stdout(Stdio::piped())
161 .stderr(Stdio::piped());
162 command.spawn().map_err(|source| {
163 if source.kind() == std::io::ErrorKind::NotFound {
164 XurlError::CommandNotFound { command: bin }
165 } else {
166 XurlError::Io {
167 path: PathBuf::from(bin),
168 source,
169 }
170 }
171 })
172 }
173
174 fn collect_text(value: Option<&Value>) -> String {
175 match value {
176 Some(Value::String(text)) => text.to_string(),
177 Some(Value::Array(items)) => items
178 .iter()
179 .map(|item| Self::collect_text(Some(item)))
180 .collect::<Vec<_>>()
181 .join(""),
182 Some(Value::Object(map)) => {
183 if map.get("type").and_then(Value::as_str) == Some("text")
184 && let Some(text) = map.get("text").and_then(Value::as_str)
185 {
186 return text.to_string();
187 }
188
189 if let Some(text) = map.get("text").and_then(Value::as_str) {
190 return text.to_string();
191 }
192
193 if let Some(content) = map.get("content") {
194 return Self::collect_text(Some(content));
195 }
196
197 String::new()
198 }
199 _ => String::new(),
200 }
201 }
202
203 fn extract_session_id(value: &Value) -> Option<&str> {
204 value
205 .get("sessionID")
206 .and_then(Value::as_str)
207 .or_else(|| value.get("sessionId").and_then(Value::as_str))
208 }
209
210 fn extract_delta_text(value: &Value) -> Option<String> {
211 value
212 .get("delta")
213 .and_then(Value::as_str)
214 .filter(|text| !text.is_empty())
215 .map(ToString::to_string)
216 .or_else(|| {
217 value
218 .get("textDelta")
219 .and_then(Value::as_str)
220 .filter(|text| !text.is_empty())
221 .map(ToString::to_string)
222 })
223 .or_else(|| {
224 value
225 .get("message")
226 .and_then(Value::as_object)
227 .and_then(|message| message.get("delta"))
228 .and_then(Value::as_str)
229 .filter(|text| !text.is_empty())
230 .map(ToString::to_string)
231 })
232 }
233
234 fn extract_assistant_text(value: &Value) -> Option<String> {
235 if value.get("role").and_then(Value::as_str) == Some("assistant") {
236 let text = Self::collect_text(value.get("content"));
237 if !text.is_empty() {
238 return Some(text);
239 }
240 }
241
242 if let Some(message) = value.get("message")
243 && message.get("role").and_then(Value::as_str) == Some("assistant")
244 {
245 let text = Self::collect_text(message.get("content"));
246 if !text.is_empty() {
247 return Some(text);
248 }
249 }
250
251 value
252 .get("response")
253 .and_then(Value::as_str)
254 .filter(|text| !text.is_empty())
255 .map(ToString::to_string)
256 }
257
258 fn run_write(
259 &self,
260 args: &[String],
261 req: &WriteRequest,
262 sink: &mut dyn WriteEventSink,
263 warnings: Vec<String>,
264 ) -> Result<WriteResult> {
265 let mut child = Self::spawn_opencode_command(args)?;
266 let stdout = child.stdout.take().ok_or_else(|| {
267 XurlError::WriteProtocol("opencode stdout pipe is unavailable".to_string())
268 })?;
269 let stderr = child.stderr.take().ok_or_else(|| {
270 XurlError::WriteProtocol("opencode stderr pipe is unavailable".to_string())
271 })?;
272 let stderr_handle = std::thread::spawn(move || {
273 let mut reader = BufReader::new(stderr);
274 let mut content = String::new();
275 let _ = reader.read_to_string(&mut content);
276 content
277 });
278
279 let stream_path = PathBuf::from("<opencode:stdout>");
280 let mut session_id = req.session_id.clone();
281 let mut final_text = None::<String>;
282 let mut streamed_text = String::new();
283 let mut streamed_delta = false;
284 let mut stream_error = None::<String>;
285 let mut saw_json_event = false;
286 let reader = BufReader::new(stdout);
287 for line in reader.lines() {
288 let line = line.map_err(|source| XurlError::Io {
289 path: stream_path.clone(),
290 source,
291 })?;
292 let trimmed = line.trim();
293 if trimmed.is_empty() {
294 continue;
295 }
296
297 let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
298 continue;
299 };
300 saw_json_event = true;
301
302 if let Some(current_session_id) = Self::extract_session_id(&value)
303 && session_id.as_deref() != Some(current_session_id)
304 {
305 sink.on_session_ready(ProviderKind::Opencode, current_session_id)?;
306 session_id = Some(current_session_id.to_string());
307 }
308
309 if value.get("type").and_then(Value::as_str) == Some("error") {
310 stream_error = value
311 .get("error")
312 .and_then(Value::as_object)
313 .and_then(|error| {
314 error
315 .get("data")
316 .and_then(Value::as_object)
317 .and_then(|data| data.get("message"))
318 .and_then(Value::as_str)
319 .or_else(|| error.get("message").and_then(Value::as_str))
320 })
321 .or_else(|| value.get("message").and_then(Value::as_str))
322 .map(ToString::to_string);
323 continue;
324 }
325
326 if let Some(delta) = Self::extract_delta_text(&value) {
327 sink.on_text_delta(&delta)?;
328 streamed_text.push_str(&delta);
329 final_text = Some(streamed_text.clone());
330 streamed_delta = true;
331 continue;
332 }
333
334 if !streamed_delta && let Some(text) = Self::extract_assistant_text(&value) {
335 sink.on_text_delta(&text)?;
336 final_text = Some(text);
337 }
338 }
339
340 let status = child.wait().map_err(|source| XurlError::Io {
341 path: PathBuf::from(Self::opencode_bin()),
342 source,
343 })?;
344 let stderr_content = stderr_handle.join().unwrap_or_default();
345 if !status.success() {
346 return Err(XurlError::CommandFailed {
347 command: format!("{} {}", Self::opencode_bin(), args.join(" ")),
348 code: status.code(),
349 stderr: stderr_content.trim().to_string(),
350 });
351 }
352
353 if !saw_json_event {
354 return Err(XurlError::WriteProtocol(
355 "opencode output does not contain JSON events".to_string(),
356 ));
357 }
358
359 if let Some(stream_error) = stream_error {
360 return Err(XurlError::WriteProtocol(format!(
361 "opencode stream returned an error: {stream_error}"
362 )));
363 }
364
365 let session_id = if let Some(session_id) = session_id {
366 session_id
367 } else {
368 return Err(XurlError::WriteProtocol(
369 "missing session id in opencode event stream".to_string(),
370 ));
371 };
372
373 Ok(WriteResult {
374 provider: ProviderKind::Opencode,
375 session_id,
376 final_text,
377 warnings,
378 })
379 }
380}
381
382impl Provider for OpencodeProvider {
383 fn kind(&self) -> ProviderKind {
384 ProviderKind::Opencode
385 }
386
387 fn resolve(&self, session_id: &str) -> Result<ResolvedThread> {
388 let db_path = self.db_path();
389 if !db_path.exists() {
390 return Err(XurlError::ThreadNotFound {
391 provider: ProviderKind::Opencode.to_string(),
392 session_id: session_id.to_string(),
393 searched_roots: vec![db_path],
394 });
395 }
396
397 let conn = Connection::open_with_flags(&db_path, OpenFlags::SQLITE_OPEN_READ_ONLY)
398 .map_err(|source| XurlError::Sqlite {
399 path: db_path.clone(),
400 source,
401 })?;
402
403 if !Self::session_exists(&conn, session_id).map_err(|source| XurlError::Sqlite {
404 path: db_path.clone(),
405 source,
406 })? {
407 return Err(XurlError::ThreadNotFound {
408 provider: ProviderKind::Opencode.to_string(),
409 session_id: session_id.to_string(),
410 searched_roots: vec![db_path],
411 });
412 }
413
414 let mut warnings = Vec::new();
415 let messages =
416 Self::fetch_messages(&conn, session_id, &mut warnings).map_err(|source| {
417 XurlError::Sqlite {
418 path: db_path.clone(),
419 source,
420 }
421 })?;
422 let parts = Self::fetch_parts(&conn, session_id, &mut warnings).map_err(|source| {
423 XurlError::Sqlite {
424 path: db_path.clone(),
425 source,
426 }
427 })?;
428
429 let raw = Self::render_jsonl(session_id, messages, parts);
430 let path = self.materialized_path(session_id);
431
432 if let Some(parent) = path.parent() {
433 fs::create_dir_all(parent).map_err(|source| XurlError::Io {
434 path: parent.to_path_buf(),
435 source,
436 })?;
437 }
438
439 fs::write(&path, raw).map_err(|source| XurlError::Io {
440 path: path.clone(),
441 source,
442 })?;
443
444 Ok(ResolvedThread {
445 provider: ProviderKind::Opencode,
446 session_id: session_id.to_string(),
447 path,
448 metadata: ResolutionMeta {
449 source: "opencode:sqlite".to_string(),
450 candidate_count: 1,
451 warnings,
452 },
453 })
454 }
455
456 fn write(&self, req: &WriteRequest, sink: &mut dyn WriteEventSink) -> Result<WriteResult> {
457 let mut warnings = Vec::new();
458 let mut args = vec!["run".to_string(), req.prompt.clone()];
459 if let Some(session_id) = req.session_id.as_deref() {
460 args.push("--session".to_string());
461 args.push(session_id.to_string());
462 } else {
463 }
465 if let Some(role) = req.options.role.as_deref() {
466 args.push("--agent".to_string());
467 args.push(role.to_string());
468 }
469 args.push("--format".to_string());
470 args.push("json".to_string());
471 if req.options.role.is_some() {
472 let ignored =
473 append_passthrough_args_excluding(&mut args, &req.options.params, &["agent"]);
474 if !ignored.is_empty() {
475 warnings.push(
476 "ignored query parameter `agent` because URI role is already set".to_string(),
477 );
478 }
479 } else {
480 append_passthrough_args(&mut args, &req.options.params);
481 }
482 self.run_write(&args, req, sink, warnings)
483 }
484}
485
486#[cfg(test)]
487mod tests {
488 use std::fs;
489 use std::path::Path;
490
491 use rusqlite::{Connection, params};
492 use tempfile::tempdir;
493
494 use crate::provider::Provider;
495 use crate::provider::opencode::OpencodeProvider;
496
497 fn prepare_db(path: &Path) -> Connection {
498 let conn = Connection::open(path).expect("open sqlite");
499 conn.execute_batch(
500 "
501 CREATE TABLE session (
502 id TEXT PRIMARY KEY
503 );
504 CREATE TABLE message (
505 id TEXT PRIMARY KEY,
506 session_id TEXT NOT NULL,
507 time_created INTEGER NOT NULL,
508 data TEXT NOT NULL
509 );
510 CREATE TABLE part (
511 id TEXT PRIMARY KEY,
512 message_id TEXT NOT NULL,
513 session_id TEXT NOT NULL,
514 time_created INTEGER NOT NULL,
515 data TEXT NOT NULL
516 );
517 ",
518 )
519 .expect("create schema");
520 conn
521 }
522
523 #[test]
524 fn resolves_from_sqlite_db() {
525 let temp = tempdir().expect("tempdir");
526 let db = temp.path().join("opencode.db");
527 let conn = prepare_db(&db);
528
529 let session_id = "ses_43a90e3adffejRgrTdlJa48CtE";
530 conn.execute("INSERT INTO session (id) VALUES (?1)", [session_id])
531 .expect("insert session");
532
533 conn.execute(
534 "INSERT INTO message (id, session_id, time_created, data) VALUES (?1, ?2, ?3, ?4)",
535 params![
536 "msg_1",
537 session_id,
538 1_i64,
539 r#"{"role":"user","time":{"created":1}}"#
540 ],
541 )
542 .expect("insert user");
543 conn.execute(
544 "INSERT INTO message (id, session_id, time_created, data) VALUES (?1, ?2, ?3, ?4)",
545 params![
546 "msg_2",
547 session_id,
548 2_i64,
549 r#"{"role":"assistant","time":{"created":2,"completed":3}}"#
550 ],
551 )
552 .expect("insert assistant");
553
554 conn.execute(
555 "INSERT INTO part (id, message_id, session_id, time_created, data) VALUES (?1, ?2, ?3, ?4, ?5)",
556 params![
557 "prt_1",
558 "msg_1",
559 session_id,
560 1_i64,
561 r#"{"type":"text","text":"hello"}"#
562 ],
563 )
564 .expect("insert user part");
565 conn.execute(
566 "INSERT INTO part (id, message_id, session_id, time_created, data) VALUES (?1, ?2, ?3, ?4, ?5)",
567 params![
568 "prt_2",
569 "msg_2",
570 session_id,
571 2_i64,
572 r#"{"type":"text","text":"world"}"#
573 ],
574 )
575 .expect("insert assistant part");
576
577 let provider = OpencodeProvider::new(temp.path());
578 let resolved = provider
579 .resolve(session_id)
580 .expect("resolve should succeed");
581
582 assert_eq!(resolved.metadata.source, "opencode:sqlite");
583 assert!(resolved.path.exists());
584
585 let raw = fs::read_to_string(&resolved.path).expect("read materialized");
586 assert!(raw.contains(r#""type":"session""#));
587 assert!(raw.contains(r#""type":"message""#));
588 assert!(raw.contains(r#""text":"hello""#));
589 assert!(raw.contains(r#""text":"world""#));
590 }
591
592 #[test]
593 fn returns_not_found_when_db_missing() {
594 let temp = tempdir().expect("tempdir");
595 let provider = OpencodeProvider::new(temp.path());
596 let err = provider
597 .resolve("ses_43a90e3adffejRgrTdlJa48CtE")
598 .expect_err("must fail");
599 assert!(format!("{err}").contains("thread not found"));
600 }
601
602 #[test]
603 fn materialized_paths_are_isolated_by_root() {
604 let first_root = tempdir().expect("first tempdir");
605 let second_root = tempdir().expect("second tempdir");
606 let first = OpencodeProvider::new(first_root.path());
607 let second = OpencodeProvider::new(second_root.path());
608 let session_id = "ses_43a90e3adffejRgrTdlJa48CtE";
609
610 let first_path = first.materialized_path(session_id);
611 let second_path = second.materialized_path(session_id);
612
613 assert_ne!(first_path, second_path);
614 }
615}