Skip to main content

xurl_core/provider/
opencode.rs

1use std::collections::HashMap;
2use std::fs;
3use std::hash::{Hash, Hasher};
4use std::io::{BufRead, BufReader, Read};
5use std::path::PathBuf;
6use std::process::{Command, Stdio};
7
8use rusqlite::{Connection, OpenFlags};
9use serde_json::{Value, json};
10
11use crate::error::{Result, XurlError};
12use crate::model::{ProviderKind, ResolutionMeta, ResolvedThread, WriteRequest, WriteResult};
13use crate::provider::{
14    Provider, WriteEventSink, append_passthrough_args, append_passthrough_args_excluding,
15};
16
17#[derive(Debug, Clone)]
18pub struct OpencodeProvider {
19    root: PathBuf,
20}
21
22impl OpencodeProvider {
23    pub fn new(root: impl Into<PathBuf>) -> Self {
24        Self { root: root.into() }
25    }
26
27    fn db_path(&self) -> PathBuf {
28        self.root.join("opencode.db")
29    }
30
31    fn materialized_path(&self, session_id: &str) -> PathBuf {
32        let mut hasher = std::collections::hash_map::DefaultHasher::new();
33        self.root.hash(&mut hasher);
34        let root_key = format!("{:016x}", hasher.finish());
35
36        std::env::temp_dir()
37            .join("xurl-opencode")
38            .join(root_key)
39            .join(format!("{session_id}.jsonl"))
40    }
41
42    fn session_exists(
43        conn: &Connection,
44        session_id: &str,
45    ) -> std::result::Result<bool, rusqlite::Error> {
46        let mut stmt = conn.prepare("SELECT 1 FROM session WHERE id = ?1 LIMIT 1")?;
47        let mut rows = stmt.query([session_id])?;
48        Ok(rows.next()?.is_some())
49    }
50
51    fn fetch_messages(
52        conn: &Connection,
53        session_id: &str,
54        warnings: &mut Vec<String>,
55    ) -> std::result::Result<Vec<(String, Value)>, rusqlite::Error> {
56        let mut stmt = conn.prepare(
57            "SELECT id, data
58             FROM message
59             WHERE session_id = ?1
60             ORDER BY time_created ASC, id ASC",
61        )?;
62
63        let rows = stmt.query_map([session_id], |row| {
64            let id = row.get::<_, String>(0)?;
65            let data = row.get::<_, String>(1)?;
66            Ok((id, data))
67        })?;
68
69        let mut result = Vec::new();
70        for row in rows {
71            let (id, data) = row?;
72            match serde_json::from_str::<Value>(&data) {
73                Ok(value) => result.push((id, value)),
74                Err(err) => warnings.push(format!(
75                    "skipped message id={id}: invalid json payload ({err})"
76                )),
77            }
78        }
79
80        Ok(result)
81    }
82
83    fn fetch_parts(
84        conn: &Connection,
85        session_id: &str,
86        warnings: &mut Vec<String>,
87    ) -> std::result::Result<HashMap<String, Vec<Value>>, rusqlite::Error> {
88        let mut stmt = conn.prepare(
89            "SELECT message_id, data
90             FROM part
91             WHERE session_id = ?1
92             ORDER BY time_created ASC, id ASC",
93        )?;
94
95        let rows = stmt.query_map([session_id], |row| {
96            let message_id = row.get::<_, String>(0)?;
97            let data = row.get::<_, String>(1)?;
98            Ok((message_id, data))
99        })?;
100
101        let mut result = HashMap::new();
102        for row in rows {
103            let (message_id, data) = row?;
104            match serde_json::from_str::<Value>(&data) {
105                Ok(value) => {
106                    result
107                        .entry(message_id)
108                        .or_insert_with(Vec::new)
109                        .push(value);
110                }
111                Err(err) => warnings.push(format!(
112                    "skipped part for message_id={message_id}: invalid json payload ({err})"
113                )),
114            }
115        }
116
117        Ok(result)
118    }
119
120    fn render_jsonl(
121        session_id: &str,
122        messages: Vec<(String, Value)>,
123        mut parts: HashMap<String, Vec<Value>>,
124    ) -> String {
125        let mut lines = Vec::with_capacity(messages.len() + 1);
126        lines.push(json!({
127            "type": "session",
128            "sessionId": session_id,
129        }));
130
131        for (id, message) in messages {
132            lines.push(json!({
133                "type": "message",
134                "id": id,
135                "sessionId": session_id,
136                "message": message,
137                "parts": parts.remove(&id).unwrap_or_default(),
138            }));
139        }
140
141        let mut output = String::new();
142        for line in lines {
143            let encoded = serde_json::to_string(&line).expect("json serialization should succeed");
144            output.push_str(&encoded);
145            output.push('\n');
146        }
147        output
148    }
149
150    fn opencode_bin() -> String {
151        std::env::var("XURL_OPENCODE_BIN").unwrap_or_else(|_| "opencode".to_string())
152    }
153
154    fn spawn_opencode_command(args: &[String]) -> Result<std::process::Child> {
155        let bin = Self::opencode_bin();
156        let mut command = Command::new(&bin);
157        command
158            .args(args)
159            .stdin(Stdio::null())
160            .stdout(Stdio::piped())
161            .stderr(Stdio::piped());
162        command.spawn().map_err(|source| {
163            if source.kind() == std::io::ErrorKind::NotFound {
164                XurlError::CommandNotFound { command: bin }
165            } else {
166                XurlError::Io {
167                    path: PathBuf::from(bin),
168                    source,
169                }
170            }
171        })
172    }
173
174    fn collect_text(value: Option<&Value>) -> String {
175        match value {
176            Some(Value::String(text)) => text.to_string(),
177            Some(Value::Array(items)) => items
178                .iter()
179                .map(|item| Self::collect_text(Some(item)))
180                .collect::<Vec<_>>()
181                .join(""),
182            Some(Value::Object(map)) => {
183                if map.get("type").and_then(Value::as_str) == Some("text")
184                    && let Some(text) = map.get("text").and_then(Value::as_str)
185                {
186                    return text.to_string();
187                }
188
189                if let Some(text) = map.get("text").and_then(Value::as_str) {
190                    return text.to_string();
191                }
192
193                if let Some(content) = map.get("content") {
194                    return Self::collect_text(Some(content));
195                }
196
197                String::new()
198            }
199            _ => String::new(),
200        }
201    }
202
203    fn extract_session_id(value: &Value) -> Option<&str> {
204        value
205            .get("sessionID")
206            .and_then(Value::as_str)
207            .or_else(|| value.get("sessionId").and_then(Value::as_str))
208    }
209
210    fn extract_delta_text(value: &Value) -> Option<String> {
211        value
212            .get("delta")
213            .and_then(Value::as_str)
214            .filter(|text| !text.is_empty())
215            .map(ToString::to_string)
216            .or_else(|| {
217                value
218                    .get("textDelta")
219                    .and_then(Value::as_str)
220                    .filter(|text| !text.is_empty())
221                    .map(ToString::to_string)
222            })
223            .or_else(|| {
224                value
225                    .get("message")
226                    .and_then(Value::as_object)
227                    .and_then(|message| message.get("delta"))
228                    .and_then(Value::as_str)
229                    .filter(|text| !text.is_empty())
230                    .map(ToString::to_string)
231            })
232    }
233
234    fn extract_assistant_text(value: &Value) -> Option<String> {
235        if value.get("role").and_then(Value::as_str) == Some("assistant") {
236            let text = Self::collect_text(value.get("content"));
237            if !text.is_empty() {
238                return Some(text);
239            }
240        }
241
242        if let Some(message) = value.get("message")
243            && message.get("role").and_then(Value::as_str) == Some("assistant")
244        {
245            let text = Self::collect_text(message.get("content"));
246            if !text.is_empty() {
247                return Some(text);
248            }
249        }
250
251        value
252            .get("response")
253            .and_then(Value::as_str)
254            .filter(|text| !text.is_empty())
255            .map(ToString::to_string)
256    }
257
258    fn run_write(
259        &self,
260        args: &[String],
261        req: &WriteRequest,
262        sink: &mut dyn WriteEventSink,
263        warnings: Vec<String>,
264    ) -> Result<WriteResult> {
265        let mut child = Self::spawn_opencode_command(args)?;
266        let stdout = child.stdout.take().ok_or_else(|| {
267            XurlError::WriteProtocol("opencode stdout pipe is unavailable".to_string())
268        })?;
269        let stderr = child.stderr.take().ok_or_else(|| {
270            XurlError::WriteProtocol("opencode stderr pipe is unavailable".to_string())
271        })?;
272        let stderr_handle = std::thread::spawn(move || {
273            let mut reader = BufReader::new(stderr);
274            let mut content = String::new();
275            let _ = reader.read_to_string(&mut content);
276            content
277        });
278
279        let stream_path = PathBuf::from("<opencode:stdout>");
280        let mut session_id = req.session_id.clone();
281        let mut final_text = None::<String>;
282        let mut streamed_text = String::new();
283        let mut streamed_delta = false;
284        let mut stream_error = None::<String>;
285        let mut saw_json_event = false;
286        let reader = BufReader::new(stdout);
287        for line in reader.lines() {
288            let line = line.map_err(|source| XurlError::Io {
289                path: stream_path.clone(),
290                source,
291            })?;
292            let trimmed = line.trim();
293            if trimmed.is_empty() {
294                continue;
295            }
296
297            let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
298                continue;
299            };
300            saw_json_event = true;
301
302            if let Some(current_session_id) = Self::extract_session_id(&value)
303                && session_id.as_deref() != Some(current_session_id)
304            {
305                sink.on_session_ready(ProviderKind::Opencode, current_session_id)?;
306                session_id = Some(current_session_id.to_string());
307            }
308
309            if value.get("type").and_then(Value::as_str) == Some("error") {
310                stream_error = value
311                    .get("error")
312                    .and_then(Value::as_object)
313                    .and_then(|error| {
314                        error
315                            .get("data")
316                            .and_then(Value::as_object)
317                            .and_then(|data| data.get("message"))
318                            .and_then(Value::as_str)
319                            .or_else(|| error.get("message").and_then(Value::as_str))
320                    })
321                    .or_else(|| value.get("message").and_then(Value::as_str))
322                    .map(ToString::to_string);
323                continue;
324            }
325
326            if let Some(delta) = Self::extract_delta_text(&value) {
327                sink.on_text_delta(&delta)?;
328                streamed_text.push_str(&delta);
329                final_text = Some(streamed_text.clone());
330                streamed_delta = true;
331                continue;
332            }
333
334            if !streamed_delta && let Some(text) = Self::extract_assistant_text(&value) {
335                sink.on_text_delta(&text)?;
336                final_text = Some(text);
337            }
338        }
339
340        let status = child.wait().map_err(|source| XurlError::Io {
341            path: PathBuf::from(Self::opencode_bin()),
342            source,
343        })?;
344        let stderr_content = stderr_handle.join().unwrap_or_default();
345        if !status.success() {
346            return Err(XurlError::CommandFailed {
347                command: format!("{} {}", Self::opencode_bin(), args.join(" ")),
348                code: status.code(),
349                stderr: stderr_content.trim().to_string(),
350            });
351        }
352
353        if !saw_json_event {
354            return Err(XurlError::WriteProtocol(
355                "opencode output does not contain JSON events".to_string(),
356            ));
357        }
358
359        if let Some(stream_error) = stream_error {
360            return Err(XurlError::WriteProtocol(format!(
361                "opencode stream returned an error: {stream_error}"
362            )));
363        }
364
365        let session_id = if let Some(session_id) = session_id {
366            session_id
367        } else {
368            return Err(XurlError::WriteProtocol(
369                "missing session id in opencode event stream".to_string(),
370            ));
371        };
372
373        Ok(WriteResult {
374            provider: ProviderKind::Opencode,
375            session_id,
376            final_text,
377            warnings,
378        })
379    }
380}
381
382impl Provider for OpencodeProvider {
383    fn kind(&self) -> ProviderKind {
384        ProviderKind::Opencode
385    }
386
387    fn resolve(&self, session_id: &str) -> Result<ResolvedThread> {
388        let db_path = self.db_path();
389        if !db_path.exists() {
390            return Err(XurlError::ThreadNotFound {
391                provider: ProviderKind::Opencode.to_string(),
392                session_id: session_id.to_string(),
393                searched_roots: vec![db_path],
394            });
395        }
396
397        let conn = Connection::open_with_flags(&db_path, OpenFlags::SQLITE_OPEN_READ_ONLY)
398            .map_err(|source| XurlError::Sqlite {
399                path: db_path.clone(),
400                source,
401            })?;
402
403        if !Self::session_exists(&conn, session_id).map_err(|source| XurlError::Sqlite {
404            path: db_path.clone(),
405            source,
406        })? {
407            return Err(XurlError::ThreadNotFound {
408                provider: ProviderKind::Opencode.to_string(),
409                session_id: session_id.to_string(),
410                searched_roots: vec![db_path],
411            });
412        }
413
414        let mut warnings = Vec::new();
415        let messages =
416            Self::fetch_messages(&conn, session_id, &mut warnings).map_err(|source| {
417                XurlError::Sqlite {
418                    path: db_path.clone(),
419                    source,
420                }
421            })?;
422        let parts = Self::fetch_parts(&conn, session_id, &mut warnings).map_err(|source| {
423            XurlError::Sqlite {
424                path: db_path.clone(),
425                source,
426            }
427        })?;
428
429        let raw = Self::render_jsonl(session_id, messages, parts);
430        let path = self.materialized_path(session_id);
431
432        if let Some(parent) = path.parent() {
433            fs::create_dir_all(parent).map_err(|source| XurlError::Io {
434                path: parent.to_path_buf(),
435                source,
436            })?;
437        }
438
439        fs::write(&path, raw).map_err(|source| XurlError::Io {
440            path: path.clone(),
441            source,
442        })?;
443
444        Ok(ResolvedThread {
445            provider: ProviderKind::Opencode,
446            session_id: session_id.to_string(),
447            path,
448            metadata: ResolutionMeta {
449                source: "opencode:sqlite".to_string(),
450                candidate_count: 1,
451                warnings,
452            },
453        })
454    }
455
456    fn write(&self, req: &WriteRequest, sink: &mut dyn WriteEventSink) -> Result<WriteResult> {
457        let mut warnings = Vec::new();
458        let mut args = vec!["run".to_string(), req.prompt.clone()];
459        if let Some(session_id) = req.session_id.as_deref() {
460            args.push("--session".to_string());
461            args.push(session_id.to_string());
462        } else {
463            // keep create flow without session binding
464        }
465        if let Some(role) = req.options.role.as_deref() {
466            args.push("--agent".to_string());
467            args.push(role.to_string());
468        }
469        args.push("--format".to_string());
470        args.push("json".to_string());
471        if req.options.role.is_some() {
472            let ignored =
473                append_passthrough_args_excluding(&mut args, &req.options.params, &["agent"]);
474            if !ignored.is_empty() {
475                warnings.push(
476                    "ignored query parameter `agent` because URI role is already set".to_string(),
477                );
478            }
479        } else {
480            append_passthrough_args(&mut args, &req.options.params);
481        }
482        self.run_write(&args, req, sink, warnings)
483    }
484}
485
486#[cfg(test)]
487mod tests {
488    use std::fs;
489    use std::path::Path;
490
491    use rusqlite::{Connection, params};
492    use tempfile::tempdir;
493
494    use crate::provider::Provider;
495    use crate::provider::opencode::OpencodeProvider;
496
497    fn prepare_db(path: &Path) -> Connection {
498        let conn = Connection::open(path).expect("open sqlite");
499        conn.execute_batch(
500            "
501            CREATE TABLE session (
502                id TEXT PRIMARY KEY
503            );
504            CREATE TABLE message (
505                id TEXT PRIMARY KEY,
506                session_id TEXT NOT NULL,
507                time_created INTEGER NOT NULL,
508                data TEXT NOT NULL
509            );
510            CREATE TABLE part (
511                id TEXT PRIMARY KEY,
512                message_id TEXT NOT NULL,
513                session_id TEXT NOT NULL,
514                time_created INTEGER NOT NULL,
515                data TEXT NOT NULL
516            );
517            ",
518        )
519        .expect("create schema");
520        conn
521    }
522
523    #[test]
524    fn resolves_from_sqlite_db() {
525        let temp = tempdir().expect("tempdir");
526        let db = temp.path().join("opencode.db");
527        let conn = prepare_db(&db);
528
529        let session_id = "ses_43a90e3adffejRgrTdlJa48CtE";
530        conn.execute("INSERT INTO session (id) VALUES (?1)", [session_id])
531            .expect("insert session");
532
533        conn.execute(
534            "INSERT INTO message (id, session_id, time_created, data) VALUES (?1, ?2, ?3, ?4)",
535            params![
536                "msg_1",
537                session_id,
538                1_i64,
539                r#"{"role":"user","time":{"created":1}}"#
540            ],
541        )
542        .expect("insert user");
543        conn.execute(
544            "INSERT INTO message (id, session_id, time_created, data) VALUES (?1, ?2, ?3, ?4)",
545            params![
546                "msg_2",
547                session_id,
548                2_i64,
549                r#"{"role":"assistant","time":{"created":2,"completed":3}}"#
550            ],
551        )
552        .expect("insert assistant");
553
554        conn.execute(
555            "INSERT INTO part (id, message_id, session_id, time_created, data) VALUES (?1, ?2, ?3, ?4, ?5)",
556            params![
557                "prt_1",
558                "msg_1",
559                session_id,
560                1_i64,
561                r#"{"type":"text","text":"hello"}"#
562            ],
563        )
564        .expect("insert user part");
565        conn.execute(
566            "INSERT INTO part (id, message_id, session_id, time_created, data) VALUES (?1, ?2, ?3, ?4, ?5)",
567            params![
568                "prt_2",
569                "msg_2",
570                session_id,
571                2_i64,
572                r#"{"type":"text","text":"world"}"#
573            ],
574        )
575        .expect("insert assistant part");
576
577        let provider = OpencodeProvider::new(temp.path());
578        let resolved = provider
579            .resolve(session_id)
580            .expect("resolve should succeed");
581
582        assert_eq!(resolved.metadata.source, "opencode:sqlite");
583        assert!(resolved.path.exists());
584
585        let raw = fs::read_to_string(&resolved.path).expect("read materialized");
586        assert!(raw.contains(r#""type":"session""#));
587        assert!(raw.contains(r#""type":"message""#));
588        assert!(raw.contains(r#""text":"hello""#));
589        assert!(raw.contains(r#""text":"world""#));
590    }
591
592    #[test]
593    fn returns_not_found_when_db_missing() {
594        let temp = tempdir().expect("tempdir");
595        let provider = OpencodeProvider::new(temp.path());
596        let err = provider
597            .resolve("ses_43a90e3adffejRgrTdlJa48CtE")
598            .expect_err("must fail");
599        assert!(format!("{err}").contains("thread not found"));
600    }
601
602    #[test]
603    fn materialized_paths_are_isolated_by_root() {
604        let first_root = tempdir().expect("first tempdir");
605        let second_root = tempdir().expect("second tempdir");
606        let first = OpencodeProvider::new(first_root.path());
607        let second = OpencodeProvider::new(second_root.path());
608        let session_id = "ses_43a90e3adffejRgrTdlJa48CtE";
609
610        let first_path = first.materialized_path(session_id);
611        let second_path = second.materialized_path(session_id);
612
613        assert_ne!(first_path, second_path);
614    }
615}