1use super::common::*;
2use super::types::*;
3use anyhow::{bail, Context};
4use serde_json::{json, Value};
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::process::Stdio;
8use std::time::Duration;
9use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
10use tokio::process::{Child, Command};
11use tokio::sync::mpsc;
12
13#[derive(Debug, Clone)]
14pub struct OpenCodeAgentProviderOptions {
15 pub command: Option<String>,
16 pub subcommand: Vec<String>,
17 pub args: Vec<String>,
18 pub server_subcommand: Vec<String>,
19 pub server_args: Vec<String>,
20 pub structured_output_retry_count: u64,
21 pub server_startup_timeout_ms: u64,
22 pub cwd: Option<PathBuf>,
23 pub env: HashMap<String, String>,
24 pub timeout_ms: Option<u64>,
25}
26
27impl Default for OpenCodeAgentProviderOptions {
28 fn default() -> Self {
29 Self {
30 command: None,
31 subcommand: vec!["run".into()],
32 args: Vec::new(),
33 server_subcommand: vec!["serve".into()],
34 server_args: Vec::new(),
35 structured_output_retry_count: 2,
36 server_startup_timeout_ms: 15_000,
37 cwd: None,
38 env: HashMap::new(),
39 timeout_ms: None,
40 }
41 }
42}
43
44#[derive(Debug, Clone, Default)]
45pub struct OpenCodeAgentProvider {
46 options: OpenCodeAgentProviderOptions,
47}
48
49impl OpenCodeAgentProvider {
50 pub fn new(options: OpenCodeAgentProviderOptions) -> Self {
51 Self { options }
52 }
53}
54
55#[async_trait::async_trait]
56impl AgentProvider for OpenCodeAgentProvider {
57 fn name(&self) -> &str {
58 "opencode"
59 }
60 fn schema_mode(&self) -> AgentProviderSchemaMode {
61 AgentProviderSchemaMode::Builtin
62 }
63 fn usage_mode(&self) -> AgentProviderUsageMode {
64 AgentProviderUsageMode::Builtin
65 }
66 async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
67 if option_schema(&input.options).is_some() {
68 run_opencode_structured(input, &self.options).await
69 } else {
70 run_opencode(input, &self.options).await
71 }
72 }
73}
74
75const MAX_PROMPT_ARG_LENGTH: usize = 32_000;
76
77async fn run_opencode(
78 input: AgentProviderRunInput,
79 options: &OpenCodeAgentProviderOptions,
80) -> anyhow::Result<AgentProviderResult> {
81 if input.prompt.len() > MAX_PROMPT_ARG_LENGTH {
82 return run_opencode_via_server(input, options).await;
83 }
84
85 let command = options.command.as_deref().unwrap_or("opencode");
86 let mut args = Vec::new();
87 args.extend(options.subcommand.clone());
88 args.extend(options.args.clone());
89 args.extend(["--format".into(), "json".into()]);
90 if let Some(model) = option_str(&input.options, "model") {
91 args.extend(["--model".into(), model]);
92 }
93 if let Some(thinking) = option_str(&input.options, "thinking") {
94 args.extend(["--variant".into(), thinking]);
95 }
96 if let Some(agent_type) = option_str(&input.options, "agentType") {
97 args.extend(["--agent".into(), agent_type]);
98 }
99 args.push(input.prompt.clone());
100 let cwd = input.context.cwd.as_deref().or(options.cwd.as_deref());
101 let (stdout, stderr) = run_command(
102 "OpenCode",
103 command,
104 &args,
105 None,
106 cwd,
107 &options.env,
108 options.timeout_ms,
109 )
110 .await?;
111 let parsed = parse_output(&stdout);
112 let events = match &parsed {
113 Value::Array(items) => items.clone(),
114 value => vec![value.clone()],
115 };
116 let candidate = extract_output(&parsed).unwrap_or(stdout);
117 let session_id = extract_session_id(&parsed)
118 .context("OpenCode provider response did not include a session id")?;
119 Ok(AgentProviderResult {
120 output: Value::String(candidate.trim_end().to_string()),
121 session_id: Some(session_id),
122 model: extract_model(&parsed).or_else(|| option_model(&input.options)),
123 usage: extract_usage(&parsed, true),
124 isolation: None,
125 raw: Some(to_json_value(
126 json!({ "events": events, "response": parsed, "stderr": stderr }),
127 )),
128 })
129}
130
131async fn run_opencode_via_server(
132 input: AgentProviderRunInput,
133 options: &OpenCodeAgentProviderOptions,
134) -> anyhow::Result<AgentProviderResult> {
135 let command = options.command.as_deref().unwrap_or("opencode");
136 let mut server = start_opencode_server(command, options, &input).await?;
137 let directory = input
138 .context
139 .cwd
140 .as_ref()
141 .or(options.cwd.as_ref())
142 .cloned()
143 .unwrap_or(std::env::current_dir()?);
144 let mut session_body = json!({
145 "title": "smol-workflows agent call",
146 });
147 if let Some(agent_type) = option_str(&input.options, "agentType") {
148 session_body["agent"] = Value::String(agent_type);
149 }
150 let session = request_json(
151 &server.url,
152 "/session",
153 "POST",
154 &[("directory", directory.to_string_lossy().to_string())],
155 &session_body,
156 )
157 .await?;
158 let session_id = extract_session_id(&session)
159 .or_else(|| {
160 session
161 .get("id")
162 .and_then(Value::as_str)
163 .map(ToString::to_string)
164 })
165 .ok_or_else(|| {
166 anyhow::anyhow!(
167 "OpenCode create-session response did not include a session id: {session}"
168 )
169 })?;
170
171 let model = option_str(&input.options, "model")
172 .map(|model| split_model(&model))
173 .transpose()?;
174 let mut body = json!({
175 "parts": [{ "type": "text", "text": input.prompt }],
176 });
177 if let Some(model) = model {
178 body["model"] = model;
179 }
180 if let Some(thinking) = option_str(&input.options, "thinking") {
181 body["variant"] = Value::String(thinking);
182 }
183 if let Some(agent_type) = option_str(&input.options, "agentType") {
184 body["agent"] = Value::String(agent_type);
185 }
186 let response = request_json(
187 &server.url,
188 &format!("/session/{}/message", url_encode(&session_id)),
189 "POST",
190 &[("directory", directory.to_string_lossy().to_string())],
191 &body,
192 )
193 .await?;
194 let output = extract_output(&response).ok_or_else(|| {
195 anyhow::anyhow!("OpenCode response did not include a final assistant message")
196 })?;
197 let logs = server.logs.clone();
198 server.stop().await;
199 Ok(AgentProviderResult {
200 output: Value::String(output.trim_end().to_string()),
201 session_id: Some(session_id),
202 model: extract_model(&response)
203 .or_else(|| extract_model(&session))
204 .or_else(|| option_model(&input.options)),
205 usage: extract_usage(&response, true),
206 isolation: None,
207 raw: Some(to_json_value(
208 json!({ "events": [session, response], "session": session, "response": response, "serverLogs": logs }),
209 )),
210 })
211}
212
213async fn run_opencode_structured(
214 input: AgentProviderRunInput,
215 options: &OpenCodeAgentProviderOptions,
216) -> anyhow::Result<AgentProviderResult> {
217 let command = options.command.as_deref().unwrap_or("opencode");
218 let mut server = start_opencode_server(command, options, &input).await?;
219 let directory = input
220 .context
221 .cwd
222 .as_ref()
223 .or(options.cwd.as_ref())
224 .cloned()
225 .unwrap_or(std::env::current_dir()?);
226 let mut session_body = json!({
227 "title": "smol-workflows structured output",
228 });
229 if let Some(agent_type) = option_str(&input.options, "agentType") {
230 session_body["agent"] = Value::String(agent_type);
231 }
232 let session = request_json(
233 &server.url,
234 "/session",
235 "POST",
236 &[("directory", directory.to_string_lossy().to_string())],
237 &session_body,
238 )
239 .await?;
240 let session_id = extract_session_id(&session)
241 .or_else(|| {
242 session
243 .get("id")
244 .and_then(Value::as_str)
245 .map(ToString::to_string)
246 })
247 .ok_or_else(|| {
248 anyhow::anyhow!(
249 "OpenCode create-session response did not include a session id: {session}"
250 )
251 })?;
252
253 let model = option_str(&input.options, "model")
254 .map(|model| split_model(&model))
255 .transpose()?;
256 let mut body = json!({
257 "parts": [{ "type": "text", "text": input.prompt }],
258 "format": {
259 "type": "json_schema",
260 "schema": option_schema(&input.options).cloned(),
261 "retryCount": options.structured_output_retry_count,
262 }
263 });
264 if let Some(model) = model {
265 body["model"] = model;
266 }
267 if let Some(thinking) = option_str(&input.options, "thinking") {
268 body["variant"] = Value::String(thinking);
269 }
270 if let Some(agent_type) = option_str(&input.options, "agentType") {
271 body["agent"] = Value::String(agent_type);
272 }
273 let response = request_json(
274 &server.url,
275 &format!("/session/{}/message", url_encode(&session_id)),
276 "POST",
277 &[("directory", directory.to_string_lossy().to_string())],
278 &body,
279 )
280 .await?;
281 let output = extract_structured_output(&response).ok_or_else(|| {
282 anyhow::anyhow!("OpenCode structured-output response did not include a structured value")
283 })?;
284 let logs = server.logs.clone();
285 server.stop().await;
286 Ok(AgentProviderResult {
287 output,
288 session_id: Some(session_id),
289 model: extract_model(&response)
290 .or_else(|| extract_model(&session))
291 .or_else(|| option_model(&input.options)),
292 usage: extract_usage(&response, true),
293 isolation: None,
294 raw: Some(to_json_value(
295 json!({ "events": [session, response], "session": session, "response": response, "serverLogs": logs }),
296 )),
297 })
298}
299
300struct OpenCodeServer {
301 child: Child,
302 url: String,
303 logs: String,
304}
305impl OpenCodeServer {
306 async fn stop(&mut self) {
307 let _ = self.child.start_kill();
308 let _ = self.child.wait().await;
309 }
310}
311impl Drop for OpenCodeServer {
312 fn drop(&mut self) {
313 let _ = self.child.start_kill();
314 }
315}
316
317async fn start_opencode_server(
318 command: &str,
319 options: &OpenCodeAgentProviderOptions,
320 input: &AgentProviderRunInput,
321) -> anyhow::Result<OpenCodeServer> {
322 let mut args = Vec::new();
323 args.extend(options.server_subcommand.clone());
324 args.extend(options.server_args.clone());
325 args.extend([
326 "--hostname".into(),
327 "127.0.0.1".into(),
328 "--port".into(),
329 "0".into(),
330 ]);
331 let mut cmd = Command::new(command);
332 cmd.args(&args)
333 .stdout(Stdio::piped())
334 .stderr(Stdio::piped())
335 .stdin(Stdio::null());
336 if let Some(cwd) = input.context.cwd.as_ref().or(options.cwd.as_ref()) {
337 cmd.current_dir(cwd);
338 }
339 cmd.envs(&options.env);
340 let mut child = cmd.spawn().context("failed to spawn OpenCode server")?;
341 let stdout = child
342 .stdout
343 .take()
344 .context("failed to capture OpenCode server stdout")?;
345 let stderr = child
346 .stderr
347 .take()
348 .context("failed to capture OpenCode server stderr")?;
349 let (tx, mut rx) = mpsc::unbounded_channel::<String>();
350 spawn_reader(stdout, tx.clone());
351 spawn_reader(stderr, tx);
352 let deadline =
353 tokio::time::Instant::now() + Duration::from_millis(options.server_startup_timeout_ms);
354 let mut logs = String::new();
355
356 loop {
357 if let Some(status) = child.try_wait()? {
358 bail!(
359 "OpenCode server exited before it was ready with code {:?}{}",
360 status.code(),
361 if logs.is_empty() {
362 String::new()
363 } else {
364 format!(": {}", truncate(&logs, 4000))
365 }
366 );
367 }
368 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
369 if remaining.is_zero() {
370 let _ = child.start_kill();
371 bail!(
372 "Timed out waiting for OpenCode server URL{}",
373 if logs.is_empty() {
374 String::new()
375 } else {
376 format!(": {}", truncate(&logs, 4000))
377 }
378 );
379 }
380 tokio::select! {
381 Some(chunk) = rx.recv() => {
382 logs.push_str(&chunk);
383 if let Some(url) = extract_server_url(&logs) {
384 return Ok(OpenCodeServer { child, url, logs });
385 }
386 }
387 _ = tokio::time::sleep(remaining.min(Duration::from_millis(50))) => {}
388 }
389 }
390}
391
392fn spawn_reader<R: AsyncRead + Unpin + Send + 'static>(
393 reader: R,
394 tx: mpsc::UnboundedSender<String>,
395) {
396 tokio::spawn(async move {
397 let mut lines = BufReader::new(reader).lines();
398 while let Ok(Some(line)) = lines.next_line().await {
399 let _ = tx.send(format!("{line}\n"));
400 }
401 });
402}
403
404fn extract_server_url(logs: &str) -> Option<String> {
405 let marker = "opencode server listening on ";
406 let start = logs.find(marker)? + marker.len();
407 let rest = &logs[start..];
408 Some(rest.split_whitespace().next()?.to_string())
409}
410
411async fn request_json(
412 base: &str,
413 path: &str,
414 method: &str,
415 query: &[(impl AsRef<str>, String)],
416 body: &Value,
417) -> anyhow::Result<Value> {
418 if method != "POST" {
419 bail!("unsupported method {method}");
420 }
421
422 let url = build_url(base, path, query);
423 let response = reqwest::Client::new().post(url).json(body).send().await?;
424 let status = response.status();
425 let text = response.text().await?;
426 if !status.is_success() {
427 bail!(
428 "OpenCode {method} {path} failed with HTTP {status}: {}",
429 if text.trim().is_empty() {
430 "<empty response body>".to_string()
431 } else {
432 truncate(&text, 4000)
433 }
434 );
435 }
436 Ok(if text.trim().is_empty() {
437 Value::Null
438 } else {
439 serde_json::from_str(&text)?
440 })
441}
442
443fn build_url(base: &str, path: &str, query: &[(impl AsRef<str>, String)]) -> String {
444 let mut url = format!("{}{}", base.trim_end_matches('/'), path);
445 if !query.is_empty() {
446 url.push('?');
447 url.push_str(
448 &query
449 .iter()
450 .map(|(key, value)| format!("{}={}", key.as_ref(), url_encode(value)))
451 .collect::<Vec<_>>()
452 .join("&"),
453 );
454 }
455 url
456}
457
458fn url_encode(value: &str) -> String {
459 value
460 .replace('%', "%25")
461 .replace('/', "%2F")
462 .replace(' ', "%20")
463}
464
465fn split_model(model: &str) -> anyhow::Result<Value> {
466 let Some((provider, model_id)) = model.split_once('/') else {
467 bail!("OpenCode model must use provider/model form for structured output, got: {model}")
468 };
469 if provider.is_empty() || model_id.is_empty() {
470 bail!("OpenCode model must use provider/model form for structured output, got: {model}");
471 }
472 Ok(json!({ "providerID": provider, "modelID": model_id }))
473}
474
475fn parse_output(stdout: &str) -> Value {
476 let trimmed = stdout.trim();
477 if trimmed.is_empty() {
478 return Value::String(String::new());
479 }
480 serde_json::from_str(trimmed).unwrap_or_else(|_| {
481 let events = parse_json_lines(stdout);
482 if events.is_empty() {
483 Value::String(stdout.to_string())
484 } else {
485 Value::Array(events)
486 }
487 })
488}
489
490fn extract_structured_output(value: &Value) -> Option<Value> {
491 match value {
492 Value::Array(items) => items.iter().find_map(extract_structured_output),
493 Value::Object(record) => {
494 for key in ["structured", "structured_output", "structuredOutput"] {
495 if record.contains_key(key) {
496 return record.get(key).cloned();
497 }
498 }
499 if record.get("type").and_then(Value::as_str) == Some("tool")
500 && record.get("tool").and_then(Value::as_str) == Some("StructuredOutput")
501 {
502 if let Some(input) = get_path(value, &["state", "input"]) {
503 return Some(input.clone());
504 }
505 }
506 record.values().find_map(extract_structured_output)
507 }
508 _ => None,
509 }
510}
511
512fn extract_output(raw: &Value) -> Option<String> {
513 match raw {
514 Value::String(text) => Some(text.clone()),
515 Value::Array(items) => items.iter().rev().find_map(extract_output),
516 Value::Object(record) => {
517 if record.get("type").and_then(Value::as_str) == Some("text") {
518 if let Some(text) = record.get("part").and_then(extract_text) {
519 return Some(text);
520 }
521 }
522 for key in ["result", "output", "text", "message", "content", "parts"] {
523 if let Some(text) = record.get(key).and_then(extract_text) {
524 if !text.is_empty() {
525 return Some(text);
526 }
527 }
528 }
529 for key in ["data", "item", "event", "properties"] {
530 if let Some(value) = record.get(key).and_then(extract_output) {
531 if !value.is_empty() {
532 return Some(value);
533 }
534 }
535 }
536 None
537 }
538 _ => None,
539 }
540}
541
542fn extract_text(value: &Value) -> Option<String> {
543 match value {
544 Value::String(text) => Some(text.clone()),
545 Value::Array(items) => {
546 let text = items
547 .iter()
548 .map(|item| extract_text(item).unwrap_or_default())
549 .collect::<Vec<_>>()
550 .join("");
551 (!text.is_empty()).then_some(text)
552 }
553 Value::Object(record) => record
554 .get("text")
555 .or_else(|| record.get("content"))
556 .or_else(|| record.get("message"))
557 .or_else(|| record.get("parts"))
558 .and_then(extract_text),
559 _ => None,
560 }
561}
562
563fn extract_session_id(raw: &Value) -> Option<String> {
564 match raw {
565 Value::Array(items) => items.iter().find_map(extract_session_id),
566 Value::Object(record) => {
567 for key in ["sessionID", "sessionId", "session_id"] {
568 if let Some(value) = record.get(key).and_then(Value::as_str) {
569 return Some(value.to_string());
570 }
571 }
572 record.values().find_map(extract_session_id)
573 }
574 _ => None,
575 }
576}
577
578fn extract_usage(raw: &Value, sum: bool) -> Option<AgentUsage> {
579 let mut candidates = Vec::new();
580 find_usage_objects(raw, &mut candidates);
581 let mut usage = None;
582 for candidate in candidates {
583 usage = Some(if sum {
584 merge_usage_sum(usage, normalize_usage(&candidate))
585 } else {
586 merge_usage_right(usage, normalize_usage(&candidate))
587 });
588 }
589 usage
590}