Skip to main content

smol_workflow_engine/agent_providers/
opencode.rs

1use super::common::*;
2use super::types::*;
3use anyhow::{bail, Context};
4use serde_json::{json, Value};
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::process::Stdio;
8use std::time::Duration;
9use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
10use tokio::process::{Child, Command};
11use tokio::sync::mpsc;
12
13#[derive(Debug, Clone)]
14pub struct OpenCodeAgentProviderOptions {
15    pub command: Option<String>,
16    pub subcommand: Vec<String>,
17    pub args: Vec<String>,
18    pub server_subcommand: Vec<String>,
19    pub server_args: Vec<String>,
20    pub structured_output_retry_count: u64,
21    pub server_startup_timeout_ms: u64,
22    pub cwd: Option<PathBuf>,
23    pub env: HashMap<String, String>,
24    pub timeout_ms: Option<u64>,
25}
26
27impl Default for OpenCodeAgentProviderOptions {
28    fn default() -> Self {
29        Self {
30            command: None,
31            subcommand: vec!["run".into()],
32            args: Vec::new(),
33            server_subcommand: vec!["serve".into()],
34            server_args: Vec::new(),
35            structured_output_retry_count: 2,
36            server_startup_timeout_ms: 15_000,
37            cwd: None,
38            env: HashMap::new(),
39            timeout_ms: None,
40        }
41    }
42}
43
44#[derive(Debug, Clone, Default)]
45pub struct OpenCodeAgentProvider {
46    options: OpenCodeAgentProviderOptions,
47}
48
49impl OpenCodeAgentProvider {
50    pub fn new(options: OpenCodeAgentProviderOptions) -> Self {
51        Self { options }
52    }
53}
54
55#[async_trait::async_trait]
56impl AgentProvider for OpenCodeAgentProvider {
57    fn name(&self) -> &str {
58        "opencode"
59    }
60    fn schema_mode(&self) -> AgentProviderSchemaMode {
61        AgentProviderSchemaMode::Builtin
62    }
63    fn usage_mode(&self) -> AgentProviderUsageMode {
64        AgentProviderUsageMode::Builtin
65    }
66    async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
67        if option_schema(&input.options).is_some() {
68            run_opencode_structured(input, &self.options).await
69        } else {
70            run_opencode(input, &self.options).await
71        }
72    }
73}
74
75const MAX_PROMPT_ARG_LENGTH: usize = 32_000;
76
77async fn run_opencode(
78    input: AgentProviderRunInput,
79    options: &OpenCodeAgentProviderOptions,
80) -> anyhow::Result<AgentProviderResult> {
81    if input.prompt.len() > MAX_PROMPT_ARG_LENGTH {
82        return run_opencode_via_server(input, options).await;
83    }
84
85    let command = options.command.as_deref().unwrap_or("opencode");
86    let mut args = Vec::new();
87    args.extend(options.subcommand.clone());
88    args.extend(options.args.clone());
89    args.extend(["--format".into(), "json".into()]);
90    if let Some(model) = option_str(&input.options, "model") {
91        args.extend(["--model".into(), model]);
92    }
93    if let Some(thinking) = option_str(&input.options, "thinking") {
94        args.extend(["--variant".into(), thinking]);
95    }
96    if let Some(agent_type) = option_str(&input.options, "agentType") {
97        args.extend(["--agent".into(), agent_type]);
98    }
99    args.push(input.prompt.clone());
100    let cwd = input.context.cwd.as_deref().or(options.cwd.as_deref());
101    let (stdout, stderr) = run_command(
102        "OpenCode",
103        command,
104        &args,
105        None,
106        cwd,
107        &options.env,
108        options.timeout_ms,
109    )
110    .await?;
111    let parsed = parse_output(&stdout);
112    let events = match &parsed {
113        Value::Array(items) => items.clone(),
114        value => vec![value.clone()],
115    };
116    let candidate = extract_output(&parsed).unwrap_or(stdout);
117    let session_id = extract_session_id(&parsed)
118        .context("OpenCode provider response did not include a session id")?;
119    Ok(AgentProviderResult {
120        output: Value::String(candidate.trim_end().to_string()),
121        session_id: Some(session_id),
122        model: extract_model(&parsed).or_else(|| option_model(&input.options)),
123        usage: extract_usage(&parsed, true),
124        isolation: None,
125        raw: Some(to_json_value(
126            json!({ "events": events, "response": parsed, "stderr": stderr }),
127        )),
128    })
129}
130
131async fn run_opencode_via_server(
132    input: AgentProviderRunInput,
133    options: &OpenCodeAgentProviderOptions,
134) -> anyhow::Result<AgentProviderResult> {
135    let command = options.command.as_deref().unwrap_or("opencode");
136    let mut server = start_opencode_server(command, options, &input).await?;
137    let directory = input
138        .context
139        .cwd
140        .as_ref()
141        .or(options.cwd.as_ref())
142        .cloned()
143        .unwrap_or(std::env::current_dir()?);
144    let mut session_body = json!({
145        "title": "smol-workflows agent call",
146    });
147    if let Some(agent_type) = option_str(&input.options, "agentType") {
148        session_body["agent"] = Value::String(agent_type);
149    }
150    let session = request_json(
151        &server.url,
152        "/session",
153        "POST",
154        &[("directory", directory.to_string_lossy().to_string())],
155        &session_body,
156    )
157    .await?;
158    let session_id = extract_session_id(&session)
159        .or_else(|| {
160            session
161                .get("id")
162                .and_then(Value::as_str)
163                .map(ToString::to_string)
164        })
165        .ok_or_else(|| {
166            anyhow::anyhow!(
167                "OpenCode create-session response did not include a session id: {session}"
168            )
169        })?;
170
171    let model = option_str(&input.options, "model")
172        .map(|model| split_model(&model))
173        .transpose()?;
174    let mut body = json!({
175        "parts": [{ "type": "text", "text": input.prompt }],
176    });
177    if let Some(model) = model {
178        body["model"] = model;
179    }
180    if let Some(thinking) = option_str(&input.options, "thinking") {
181        body["variant"] = Value::String(thinking);
182    }
183    if let Some(agent_type) = option_str(&input.options, "agentType") {
184        body["agent"] = Value::String(agent_type);
185    }
186    let response = request_json(
187        &server.url,
188        &format!("/session/{}/message", url_encode(&session_id)),
189        "POST",
190        &[("directory", directory.to_string_lossy().to_string())],
191        &body,
192    )
193    .await?;
194    let output = extract_output(&response).ok_or_else(|| {
195        anyhow::anyhow!("OpenCode response did not include a final assistant message")
196    })?;
197    let logs = server.logs.clone();
198    server.stop().await;
199    Ok(AgentProviderResult {
200        output: Value::String(output.trim_end().to_string()),
201        session_id: Some(session_id),
202        model: extract_model(&response)
203            .or_else(|| extract_model(&session))
204            .or_else(|| option_model(&input.options)),
205        usage: extract_usage(&response, true),
206        isolation: None,
207        raw: Some(to_json_value(
208            json!({ "events": [session, response], "session": session, "response": response, "serverLogs": logs }),
209        )),
210    })
211}
212
213async fn run_opencode_structured(
214    input: AgentProviderRunInput,
215    options: &OpenCodeAgentProviderOptions,
216) -> anyhow::Result<AgentProviderResult> {
217    let command = options.command.as_deref().unwrap_or("opencode");
218    let mut server = start_opencode_server(command, options, &input).await?;
219    let directory = input
220        .context
221        .cwd
222        .as_ref()
223        .or(options.cwd.as_ref())
224        .cloned()
225        .unwrap_or(std::env::current_dir()?);
226    let mut session_body = json!({
227        "title": "smol-workflows structured output",
228    });
229    if let Some(agent_type) = option_str(&input.options, "agentType") {
230        session_body["agent"] = Value::String(agent_type);
231    }
232    let session = request_json(
233        &server.url,
234        "/session",
235        "POST",
236        &[("directory", directory.to_string_lossy().to_string())],
237        &session_body,
238    )
239    .await?;
240    let session_id = extract_session_id(&session)
241        .or_else(|| {
242            session
243                .get("id")
244                .and_then(Value::as_str)
245                .map(ToString::to_string)
246        })
247        .ok_or_else(|| {
248            anyhow::anyhow!(
249                "OpenCode create-session response did not include a session id: {session}"
250            )
251        })?;
252
253    let model = option_str(&input.options, "model")
254        .map(|model| split_model(&model))
255        .transpose()?;
256    let mut body = json!({
257        "parts": [{ "type": "text", "text": input.prompt }],
258        "format": {
259            "type": "json_schema",
260            "schema": option_schema(&input.options).cloned(),
261            "retryCount": options.structured_output_retry_count,
262        }
263    });
264    if let Some(model) = model {
265        body["model"] = model;
266    }
267    if let Some(thinking) = option_str(&input.options, "thinking") {
268        body["variant"] = Value::String(thinking);
269    }
270    if let Some(agent_type) = option_str(&input.options, "agentType") {
271        body["agent"] = Value::String(agent_type);
272    }
273    let response = request_json(
274        &server.url,
275        &format!("/session/{}/message", url_encode(&session_id)),
276        "POST",
277        &[("directory", directory.to_string_lossy().to_string())],
278        &body,
279    )
280    .await?;
281    let output = extract_structured_output(&response).ok_or_else(|| {
282        anyhow::anyhow!("OpenCode structured-output response did not include a structured value")
283    })?;
284    let logs = server.logs.clone();
285    server.stop().await;
286    Ok(AgentProviderResult {
287        output,
288        session_id: Some(session_id),
289        model: extract_model(&response)
290            .or_else(|| extract_model(&session))
291            .or_else(|| option_model(&input.options)),
292        usage: extract_usage(&response, true),
293        isolation: None,
294        raw: Some(to_json_value(
295            json!({ "events": [session, response], "session": session, "response": response, "serverLogs": logs }),
296        )),
297    })
298}
299
300struct OpenCodeServer {
301    child: Child,
302    url: String,
303    logs: String,
304}
305impl OpenCodeServer {
306    async fn stop(&mut self) {
307        let _ = self.child.start_kill();
308        let _ = self.child.wait().await;
309    }
310}
311impl Drop for OpenCodeServer {
312    fn drop(&mut self) {
313        let _ = self.child.start_kill();
314    }
315}
316
317async fn start_opencode_server(
318    command: &str,
319    options: &OpenCodeAgentProviderOptions,
320    input: &AgentProviderRunInput,
321) -> anyhow::Result<OpenCodeServer> {
322    let mut args = Vec::new();
323    args.extend(options.server_subcommand.clone());
324    args.extend(options.server_args.clone());
325    args.extend([
326        "--hostname".into(),
327        "127.0.0.1".into(),
328        "--port".into(),
329        "0".into(),
330    ]);
331    let mut cmd = Command::new(command);
332    cmd.args(&args)
333        .stdout(Stdio::piped())
334        .stderr(Stdio::piped())
335        .stdin(Stdio::null());
336    if let Some(cwd) = input.context.cwd.as_ref().or(options.cwd.as_ref()) {
337        cmd.current_dir(cwd);
338    }
339    cmd.envs(&options.env);
340    let mut child = cmd.spawn().context("failed to spawn OpenCode server")?;
341    let stdout = child
342        .stdout
343        .take()
344        .context("failed to capture OpenCode server stdout")?;
345    let stderr = child
346        .stderr
347        .take()
348        .context("failed to capture OpenCode server stderr")?;
349    let (tx, mut rx) = mpsc::unbounded_channel::<String>();
350    spawn_reader(stdout, tx.clone());
351    spawn_reader(stderr, tx);
352    let deadline =
353        tokio::time::Instant::now() + Duration::from_millis(options.server_startup_timeout_ms);
354    let mut logs = String::new();
355
356    loop {
357        if let Some(status) = child.try_wait()? {
358            bail!(
359                "OpenCode server exited before it was ready with code {:?}{}",
360                status.code(),
361                if logs.is_empty() {
362                    String::new()
363                } else {
364                    format!(": {}", truncate(&logs, 4000))
365                }
366            );
367        }
368        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
369        if remaining.is_zero() {
370            let _ = child.start_kill();
371            bail!(
372                "Timed out waiting for OpenCode server URL{}",
373                if logs.is_empty() {
374                    String::new()
375                } else {
376                    format!(": {}", truncate(&logs, 4000))
377                }
378            );
379        }
380        tokio::select! {
381            Some(chunk) = rx.recv() => {
382                logs.push_str(&chunk);
383                if let Some(url) = extract_server_url(&logs) {
384                    return Ok(OpenCodeServer { child, url, logs });
385                }
386            }
387            _ = tokio::time::sleep(remaining.min(Duration::from_millis(50))) => {}
388        }
389    }
390}
391
392fn spawn_reader<R: AsyncRead + Unpin + Send + 'static>(
393    reader: R,
394    tx: mpsc::UnboundedSender<String>,
395) {
396    tokio::spawn(async move {
397        let mut lines = BufReader::new(reader).lines();
398        while let Ok(Some(line)) = lines.next_line().await {
399            let _ = tx.send(format!("{line}\n"));
400        }
401    });
402}
403
404fn extract_server_url(logs: &str) -> Option<String> {
405    let marker = "opencode server listening on ";
406    let start = logs.find(marker)? + marker.len();
407    let rest = &logs[start..];
408    Some(rest.split_whitespace().next()?.to_string())
409}
410
411async fn request_json(
412    base: &str,
413    path: &str,
414    method: &str,
415    query: &[(impl AsRef<str>, String)],
416    body: &Value,
417) -> anyhow::Result<Value> {
418    if method != "POST" {
419        bail!("unsupported method {method}");
420    }
421
422    let url = build_url(base, path, query);
423    let response = reqwest::Client::new().post(url).json(body).send().await?;
424    let status = response.status();
425    let text = response.text().await?;
426    if !status.is_success() {
427        bail!(
428            "OpenCode {method} {path} failed with HTTP {status}: {}",
429            if text.trim().is_empty() {
430                "<empty response body>".to_string()
431            } else {
432                truncate(&text, 4000)
433            }
434        );
435    }
436    Ok(if text.trim().is_empty() {
437        Value::Null
438    } else {
439        serde_json::from_str(&text)?
440    })
441}
442
443fn build_url(base: &str, path: &str, query: &[(impl AsRef<str>, String)]) -> String {
444    let mut url = format!("{}{}", base.trim_end_matches('/'), path);
445    if !query.is_empty() {
446        url.push('?');
447        url.push_str(
448            &query
449                .iter()
450                .map(|(key, value)| format!("{}={}", key.as_ref(), url_encode(value)))
451                .collect::<Vec<_>>()
452                .join("&"),
453        );
454    }
455    url
456}
457
458fn url_encode(value: &str) -> String {
459    value
460        .replace('%', "%25")
461        .replace('/', "%2F")
462        .replace(' ', "%20")
463}
464
465fn split_model(model: &str) -> anyhow::Result<Value> {
466    let Some((provider, model_id)) = model.split_once('/') else {
467        bail!("OpenCode model must use provider/model form for structured output, got: {model}")
468    };
469    if provider.is_empty() || model_id.is_empty() {
470        bail!("OpenCode model must use provider/model form for structured output, got: {model}");
471    }
472    Ok(json!({ "providerID": provider, "modelID": model_id }))
473}
474
475fn parse_output(stdout: &str) -> Value {
476    let trimmed = stdout.trim();
477    if trimmed.is_empty() {
478        return Value::String(String::new());
479    }
480    serde_json::from_str(trimmed).unwrap_or_else(|_| {
481        let events = parse_json_lines(stdout);
482        if events.is_empty() {
483            Value::String(stdout.to_string())
484        } else {
485            Value::Array(events)
486        }
487    })
488}
489
490fn extract_structured_output(value: &Value) -> Option<Value> {
491    match value {
492        Value::Array(items) => items.iter().find_map(extract_structured_output),
493        Value::Object(record) => {
494            for key in ["structured", "structured_output", "structuredOutput"] {
495                if record.contains_key(key) {
496                    return record.get(key).cloned();
497                }
498            }
499            if record.get("type").and_then(Value::as_str) == Some("tool")
500                && record.get("tool").and_then(Value::as_str) == Some("StructuredOutput")
501            {
502                if let Some(input) = get_path(value, &["state", "input"]) {
503                    return Some(input.clone());
504                }
505            }
506            record.values().find_map(extract_structured_output)
507        }
508        _ => None,
509    }
510}
511
512fn extract_output(raw: &Value) -> Option<String> {
513    match raw {
514        Value::String(text) => Some(text.clone()),
515        Value::Array(items) => items.iter().rev().find_map(extract_output),
516        Value::Object(record) => {
517            if record.get("type").and_then(Value::as_str) == Some("text") {
518                if let Some(text) = record.get("part").and_then(extract_text) {
519                    return Some(text);
520                }
521            }
522            for key in ["result", "output", "text", "message", "content", "parts"] {
523                if let Some(text) = record.get(key).and_then(extract_text) {
524                    if !text.is_empty() {
525                        return Some(text);
526                    }
527                }
528            }
529            for key in ["data", "item", "event", "properties"] {
530                if let Some(value) = record.get(key).and_then(extract_output) {
531                    if !value.is_empty() {
532                        return Some(value);
533                    }
534                }
535            }
536            None
537        }
538        _ => None,
539    }
540}
541
542fn extract_text(value: &Value) -> Option<String> {
543    match value {
544        Value::String(text) => Some(text.clone()),
545        Value::Array(items) => {
546            let text = items
547                .iter()
548                .map(|item| extract_text(item).unwrap_or_default())
549                .collect::<Vec<_>>()
550                .join("");
551            (!text.is_empty()).then_some(text)
552        }
553        Value::Object(record) => record
554            .get("text")
555            .or_else(|| record.get("content"))
556            .or_else(|| record.get("message"))
557            .or_else(|| record.get("parts"))
558            .and_then(extract_text),
559        _ => None,
560    }
561}
562
563fn extract_session_id(raw: &Value) -> Option<String> {
564    match raw {
565        Value::Array(items) => items.iter().find_map(extract_session_id),
566        Value::Object(record) => {
567            for key in ["sessionID", "sessionId", "session_id"] {
568                if let Some(value) = record.get(key).and_then(Value::as_str) {
569                    return Some(value.to_string());
570                }
571            }
572            record.values().find_map(extract_session_id)
573        }
574        _ => None,
575    }
576}
577
578fn extract_usage(raw: &Value, sum: bool) -> Option<AgentUsage> {
579    let mut candidates = Vec::new();
580    find_usage_objects(raw, &mut candidates);
581    let mut usage = None;
582    for candidate in candidates {
583        usage = Some(if sum {
584            merge_usage_sum(usage, normalize_usage(&candidate))
585        } else {
586            merge_usage_right(usage, normalize_usage(&candidate))
587        });
588    }
589    usage
590}