1use std::collections::BTreeMap;
2use std::io::{self, Write};
3use std::path::PathBuf;
4use std::process::{Command, Stdio};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7use std::thread;
8use std::time::{Duration, Instant};
9
10#[derive(Clone, Debug, PartialEq, Eq)]
11#[non_exhaustive]
12pub struct CommandSpec {
13 pub argv: Vec<String>,
14 pub stdin_text: Option<String>,
15 pub cwd: Option<PathBuf>,
16 pub env: BTreeMap<String, String>,
17 pub timeout_secs: Option<u64>,
18}
19
20impl CommandSpec {
21 pub fn new<I, S>(argv: I) -> Self
22 where
23 I: IntoIterator<Item = S>,
24 S: Into<String>,
25 {
26 Self {
27 argv: argv.into_iter().map(Into::into).collect(),
28 stdin_text: None,
29 cwd: None,
30 env: BTreeMap::new(),
31 timeout_secs: None,
32 }
33 }
34
35 pub fn with_stdin(mut self, stdin_text: impl Into<String>) -> Self {
36 self.stdin_text = Some(stdin_text.into());
37 self
38 }
39
40 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
41 self.cwd = Some(cwd.into());
42 self
43 }
44
45 pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
46 self.env.insert(key.into(), value.into());
47 self
48 }
49
50 pub fn with_timeout_secs(mut self, secs: u64) -> Self {
51 self.timeout_secs = Some(secs);
52 self
53 }
54}
55
56#[derive(Clone, Debug, PartialEq, Eq)]
57#[non_exhaustive]
58pub struct CompletedRun {
59 pub argv: Vec<String>,
60 pub exit_code: i32,
61 pub stdout: String,
62 pub stderr: String,
63 pub timed_out: bool,
64}
65
66impl CompletedRun {
67 pub fn new(
68 argv: Vec<String>,
69 exit_code: i32,
70 stdout: impl Into<String>,
71 stderr: impl Into<String>,
72 ) -> Self {
73 Self {
74 argv,
75 exit_code,
76 stdout: stdout.into(),
77 stderr: stderr.into(),
78 timed_out: false,
79 }
80 }
81
82 pub fn with_timed_out(mut self, timed_out: bool) -> Self {
83 self.timed_out = timed_out;
84 self
85 }
86}
87
88type StreamCallback = Arc<Mutex<dyn FnMut(&str, &str) + Send>>;
89type RunExecutor = dyn Fn(CommandSpec) -> CompletedRun + Send + Sync;
90type StreamExecutor = dyn Fn(CommandSpec, StreamCallback) -> CompletedRun + Send + Sync;
91
92pub struct Runner {
93 executor: Box<RunExecutor>,
94 stream_executor: Box<StreamExecutor>,
95}
96
97impl Runner {
98 pub fn new() -> Self {
99 Self {
100 executor: Box::new(default_run_executor),
101 stream_executor: Box::new(default_stream_executor),
102 }
103 }
104
105 pub fn with_executor(executor: Box<RunExecutor>) -> Self {
106 Self {
107 executor,
108 stream_executor: Box::new(default_stream_executor),
109 }
110 }
111
112 pub fn with_stream_executor(stream_executor: Box<StreamExecutor>) -> Self {
113 Self {
114 executor: Box::new(default_run_executor),
115 stream_executor,
116 }
117 }
118
119 pub fn run(&self, spec: CommandSpec) -> CompletedRun {
120 if spec.timeout_secs.is_some() {
121 return self.stream(spec, |_, _| {});
122 }
123 (self.executor)(spec)
124 }
125
126 pub fn stream<F>(&self, spec: CommandSpec, on_event: F) -> CompletedRun
127 where
128 F: FnMut(&str, &str) + Send + 'static,
129 {
130 (self.stream_executor)(spec, Arc::new(Mutex::new(on_event)))
131 }
132}
133
134impl Default for Runner {
135 fn default() -> Self {
136 Self::new()
137 }
138}
139
140pub fn build_prompt_spec(prompt: &str) -> Result<CommandSpec, &'static str> {
141 let normalized_prompt = prompt.trim();
142 if normalized_prompt.is_empty() {
143 return Err("prompt must not be empty");
144 }
145 Ok(CommandSpec::new(["opencode", "run", normalized_prompt]))
146}
147
148fn default_run_executor(spec: CommandSpec) -> CompletedRun {
149 let mut command = build_command(&spec);
150 let output = command
151 .output()
152 .unwrap_or_else(|error| failed_output(&spec, error));
153 CompletedRun {
154 argv: spec.argv,
155 exit_code: output.status.code().unwrap_or(1),
156 stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
157 stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
158 timed_out: false,
159 }
160}
161
162fn default_stream_executor(spec: CommandSpec, callback: StreamCallback) -> CompletedRun {
163 let argv = spec.argv.clone();
164 let timeout = spec.timeout_secs;
165 let mut command = build_command(&spec);
166 command.stdout(Stdio::piped());
167 command.stderr(Stdio::piped());
168
169 let mut child = match command.spawn() {
170 Ok(child) => child,
171 Err(error) => {
172 let error_msg = format!(
173 "failed to start {}: {}",
174 spec.argv.first().map(|s| s.as_str()).unwrap_or("(unknown)"),
175 error
176 );
177 if let Ok(mut cb) = callback.lock() {
178 cb("stderr", &error_msg);
179 }
180 return CompletedRun {
181 argv,
182 exit_code: 1,
183 stdout: String::new(),
184 stderr: error_msg,
185 timed_out: false,
186 };
187 }
188 };
189
190 if let Some(stdin_text) = &spec.stdin_text {
191 if let Some(mut stdin) = child.stdin.take() {
192 let _ = stdin.write_all(stdin_text.as_bytes());
193 }
194 }
195
196 let stdout_pipe = child.stdout.take();
197 let stderr_pipe = child.stderr.take();
198
199 let cb_out = Arc::clone(&callback);
200 let stdout_thread = thread::spawn(move || {
201 let mut buf = String::new();
202 if let Some(pipe) = stdout_pipe {
203 use std::io::BufRead;
204 let reader = std::io::BufReader::new(pipe);
205 for line in reader.lines() {
206 match line {
207 Ok(text) => {
208 buf.push_str(&text);
209 buf.push('\n');
210 let chunk = format!("{text}\n");
211 if let Ok(mut cb) = cb_out.lock() {
212 cb("stdout", &chunk);
213 }
214 }
215 Err(_) => break,
216 }
217 }
218 }
219 buf
220 });
221
222 let cb_err = Arc::clone(&callback);
223 let stderr_thread = thread::spawn(move || {
224 let mut buf = String::new();
225 if let Some(pipe) = stderr_pipe {
226 use std::io::BufRead;
227 let reader = std::io::BufReader::new(pipe);
228 for line in reader.lines() {
229 match line {
230 Ok(text) => {
231 buf.push_str(&text);
232 buf.push('\n');
233 let chunk = format!("{text}\n");
234 if let Ok(mut cb) = cb_err.lock() {
235 cb("stderr", &chunk);
236 }
237 }
238 Err(_) => break,
239 }
240 }
241 }
242 buf
243 });
244
245 let timed_out_flag = Arc::new(AtomicBool::new(false));
246 let watchdog_stop = Arc::new(AtomicBool::new(false));
247 let child_handle = Arc::new(Mutex::new(child));
248 let watchdog_handle = timeout.map(|secs| {
249 let child_arc = Arc::clone(&child_handle);
250 let flag = Arc::clone(&timed_out_flag);
251 let stop = Arc::clone(&watchdog_stop);
252 thread::spawn(move || watchdog_run(secs, child_arc, flag, stop))
253 });
254
255 let stdout_buf = stdout_thread.join().unwrap_or_default();
256 let stderr_buf = stderr_thread.join().unwrap_or_default();
257
258 watchdog_stop.store(true, Ordering::SeqCst);
259 if let Some(handle) = watchdog_handle {
260 let _ = handle.join();
261 }
262
263 let status = {
264 let mut guard = child_handle.lock().unwrap();
265 guard.wait().unwrap_or_else(|error| {
266 exit_status_from_code(failed_output(&spec, error).status.code().unwrap_or(1))
267 })
268 };
269
270 CompletedRun {
271 argv,
272 exit_code: status.code().unwrap_or(1),
273 stdout: stdout_buf,
274 stderr: stderr_buf,
275 timed_out: timed_out_flag.load(Ordering::SeqCst),
276 }
277}
278
279fn watchdog_run(
280 secs: u64,
281 child: Arc<Mutex<std::process::Child>>,
282 timed_out: Arc<AtomicBool>,
283 stop: Arc<AtomicBool>,
284) {
285 let deadline = Instant::now() + Duration::from_secs(secs);
286 while Instant::now() < deadline {
287 if stop.load(Ordering::SeqCst) {
288 return;
289 }
290 thread::sleep(Duration::from_millis(100));
291 }
292 if stop.load(Ordering::SeqCst) {
293 return;
294 }
295 let mut guard = child.lock().unwrap();
296 if matches!(guard.try_wait(), Ok(None)) {
297 timed_out.store(true, Ordering::SeqCst);
298 let _ = guard.kill();
299 }
300}
301
302fn build_command(spec: &CommandSpec) -> Command {
303 let mut argv = spec.argv.iter();
304 let program = argv.next().cloned().unwrap_or_default();
305 let mut command = Command::new(program);
306 command.args(argv);
307 if let Some(cwd) = &spec.cwd {
308 command.current_dir(cwd);
309 }
310 command.envs(&spec.env);
311 command.stdin(if spec.stdin_text.is_some() {
312 Stdio::piped()
313 } else {
314 Stdio::null()
315 });
316 command
317}
318
319fn failed_output(spec: &CommandSpec, error: io::Error) -> std::process::Output {
320 let stderr = format!(
321 "failed to start {}: {}",
322 spec.argv.first().map(|s| s.as_str()).unwrap_or("(unknown)"),
323 error
324 )
325 .into_bytes();
326 std::process::Output {
327 status: exit_status_from_code(1),
328 stdout: Vec::new(),
329 stderr,
330 }
331}
332
333#[cfg(unix)]
334fn exit_status_from_code(code: i32) -> std::process::ExitStatus {
335 std::process::ExitStatus::from_raw(code << 8)
336}
337
338#[cfg(windows)]
339fn exit_status_from_code(code: i32) -> std::process::ExitStatus {
340 std::process::ExitStatus::from_raw(code as u32)
341}
342
343#[cfg(unix)]
344use std::os::unix::process::ExitStatusExt;
345
346#[cfg(windows)]
347use std::os::windows::process::ExitStatusExt;