Skip to main content

call_coding_clis/
exec.rs

1use std::collections::BTreeMap;
2use std::io::{self, Write};
3use std::path::PathBuf;
4use std::process::{Command, Stdio};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7use std::thread;
8use std::time::{Duration, Instant};
9
10#[derive(Clone, Debug, PartialEq, Eq)]
11#[non_exhaustive]
12pub struct CommandSpec {
13    pub argv: Vec<String>,
14    pub stdin_text: Option<String>,
15    pub cwd: Option<PathBuf>,
16    pub env: BTreeMap<String, String>,
17    pub timeout_secs: Option<u64>,
18}
19
20impl CommandSpec {
21    pub fn new<I, S>(argv: I) -> Self
22    where
23        I: IntoIterator<Item = S>,
24        S: Into<String>,
25    {
26        Self {
27            argv: argv.into_iter().map(Into::into).collect(),
28            stdin_text: None,
29            cwd: None,
30            env: BTreeMap::new(),
31            timeout_secs: None,
32        }
33    }
34
35    pub fn with_stdin(mut self, stdin_text: impl Into<String>) -> Self {
36        self.stdin_text = Some(stdin_text.into());
37        self
38    }
39
40    pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
41        self.cwd = Some(cwd.into());
42        self
43    }
44
45    pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
46        self.env.insert(key.into(), value.into());
47        self
48    }
49
50    pub fn with_timeout_secs(mut self, secs: u64) -> Self {
51        self.timeout_secs = Some(secs);
52        self
53    }
54}
55
56#[derive(Clone, Debug, PartialEq, Eq)]
57#[non_exhaustive]
58pub struct CompletedRun {
59    pub argv: Vec<String>,
60    pub exit_code: i32,
61    pub stdout: String,
62    pub stderr: String,
63    pub timed_out: bool,
64}
65
66impl CompletedRun {
67    pub fn new(
68        argv: Vec<String>,
69        exit_code: i32,
70        stdout: impl Into<String>,
71        stderr: impl Into<String>,
72    ) -> Self {
73        Self {
74            argv,
75            exit_code,
76            stdout: stdout.into(),
77            stderr: stderr.into(),
78            timed_out: false,
79        }
80    }
81
82    pub fn with_timed_out(mut self, timed_out: bool) -> Self {
83        self.timed_out = timed_out;
84        self
85    }
86}
87
88type StreamCallback = Arc<Mutex<dyn FnMut(&str, &str) + Send>>;
89type RunExecutor = dyn Fn(CommandSpec) -> CompletedRun + Send + Sync;
90type StreamExecutor = dyn Fn(CommandSpec, StreamCallback) -> CompletedRun + Send + Sync;
91
92pub struct Runner {
93    executor: Box<RunExecutor>,
94    stream_executor: Box<StreamExecutor>,
95}
96
97impl Runner {
98    pub fn new() -> Self {
99        Self {
100            executor: Box::new(default_run_executor),
101            stream_executor: Box::new(default_stream_executor),
102        }
103    }
104
105    pub fn with_executor(executor: Box<RunExecutor>) -> Self {
106        Self {
107            executor,
108            stream_executor: Box::new(default_stream_executor),
109        }
110    }
111
112    pub fn with_stream_executor(stream_executor: Box<StreamExecutor>) -> Self {
113        Self {
114            executor: Box::new(default_run_executor),
115            stream_executor,
116        }
117    }
118
119    pub fn run(&self, spec: CommandSpec) -> CompletedRun {
120        if spec.timeout_secs.is_some() {
121            return self.stream(spec, |_, _| {});
122        }
123        (self.executor)(spec)
124    }
125
126    pub fn stream<F>(&self, spec: CommandSpec, on_event: F) -> CompletedRun
127    where
128        F: FnMut(&str, &str) + Send + 'static,
129    {
130        (self.stream_executor)(spec, Arc::new(Mutex::new(on_event)))
131    }
132}
133
134impl Default for Runner {
135    fn default() -> Self {
136        Self::new()
137    }
138}
139
140pub fn build_prompt_spec(prompt: &str) -> Result<CommandSpec, &'static str> {
141    let normalized_prompt = prompt.trim();
142    if normalized_prompt.is_empty() {
143        return Err("prompt must not be empty");
144    }
145    Ok(CommandSpec::new(["opencode", "run", normalized_prompt]))
146}
147
148fn default_run_executor(spec: CommandSpec) -> CompletedRun {
149    let mut command = build_command(&spec);
150    let output = command
151        .output()
152        .unwrap_or_else(|error| failed_output(&spec, error));
153    CompletedRun {
154        argv: spec.argv,
155        exit_code: output.status.code().unwrap_or(1),
156        stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
157        stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
158        timed_out: false,
159    }
160}
161
162fn default_stream_executor(spec: CommandSpec, callback: StreamCallback) -> CompletedRun {
163    let argv = spec.argv.clone();
164    let timeout = spec.timeout_secs;
165    let mut command = build_command(&spec);
166    command.stdout(Stdio::piped());
167    command.stderr(Stdio::piped());
168
169    let mut child = match command.spawn() {
170        Ok(child) => child,
171        Err(error) => {
172            let error_msg = format!(
173                "failed to start {}: {}",
174                spec.argv.first().map(|s| s.as_str()).unwrap_or("(unknown)"),
175                error
176            );
177            if let Ok(mut cb) = callback.lock() {
178                cb("stderr", &error_msg);
179            }
180            return CompletedRun {
181                argv,
182                exit_code: 1,
183                stdout: String::new(),
184                stderr: error_msg,
185                timed_out: false,
186            };
187        }
188    };
189
190    if let Some(stdin_text) = &spec.stdin_text {
191        if let Some(mut stdin) = child.stdin.take() {
192            let _ = stdin.write_all(stdin_text.as_bytes());
193        }
194    }
195
196    let stdout_pipe = child.stdout.take();
197    let stderr_pipe = child.stderr.take();
198
199    let cb_out = Arc::clone(&callback);
200    let stdout_thread = thread::spawn(move || {
201        let mut buf = String::new();
202        if let Some(pipe) = stdout_pipe {
203            use std::io::BufRead;
204            let reader = std::io::BufReader::new(pipe);
205            for line in reader.lines() {
206                match line {
207                    Ok(text) => {
208                        buf.push_str(&text);
209                        buf.push('\n');
210                        let chunk = format!("{text}\n");
211                        if let Ok(mut cb) = cb_out.lock() {
212                            cb("stdout", &chunk);
213                        }
214                    }
215                    Err(_) => break,
216                }
217            }
218        }
219        buf
220    });
221
222    let cb_err = Arc::clone(&callback);
223    let stderr_thread = thread::spawn(move || {
224        let mut buf = String::new();
225        if let Some(pipe) = stderr_pipe {
226            use std::io::BufRead;
227            let reader = std::io::BufReader::new(pipe);
228            for line in reader.lines() {
229                match line {
230                    Ok(text) => {
231                        buf.push_str(&text);
232                        buf.push('\n');
233                        let chunk = format!("{text}\n");
234                        if let Ok(mut cb) = cb_err.lock() {
235                            cb("stderr", &chunk);
236                        }
237                    }
238                    Err(_) => break,
239                }
240            }
241        }
242        buf
243    });
244
245    let timed_out_flag = Arc::new(AtomicBool::new(false));
246    let watchdog_stop = Arc::new(AtomicBool::new(false));
247    let child_handle = Arc::new(Mutex::new(child));
248    let watchdog_handle = timeout.map(|secs| {
249        let child_arc = Arc::clone(&child_handle);
250        let flag = Arc::clone(&timed_out_flag);
251        let stop = Arc::clone(&watchdog_stop);
252        thread::spawn(move || watchdog_run(secs, child_arc, flag, stop))
253    });
254
255    let stdout_buf = stdout_thread.join().unwrap_or_default();
256    let stderr_buf = stderr_thread.join().unwrap_or_default();
257
258    watchdog_stop.store(true, Ordering::SeqCst);
259    if let Some(handle) = watchdog_handle {
260        let _ = handle.join();
261    }
262
263    let status = {
264        let mut guard = child_handle.lock().unwrap();
265        guard.wait().unwrap_or_else(|error| {
266            exit_status_from_code(failed_output(&spec, error).status.code().unwrap_or(1))
267        })
268    };
269
270    CompletedRun {
271        argv,
272        exit_code: status.code().unwrap_or(1),
273        stdout: stdout_buf,
274        stderr: stderr_buf,
275        timed_out: timed_out_flag.load(Ordering::SeqCst),
276    }
277}
278
279fn watchdog_run(
280    secs: u64,
281    child: Arc<Mutex<std::process::Child>>,
282    timed_out: Arc<AtomicBool>,
283    stop: Arc<AtomicBool>,
284) {
285    let deadline = Instant::now() + Duration::from_secs(secs);
286    while Instant::now() < deadline {
287        if stop.load(Ordering::SeqCst) {
288            return;
289        }
290        thread::sleep(Duration::from_millis(100));
291    }
292    if stop.load(Ordering::SeqCst) {
293        return;
294    }
295    let mut guard = child.lock().unwrap();
296    if matches!(guard.try_wait(), Ok(None)) {
297        timed_out.store(true, Ordering::SeqCst);
298        let _ = guard.kill();
299    }
300}
301
302fn build_command(spec: &CommandSpec) -> Command {
303    let mut argv = spec.argv.iter();
304    let program = argv.next().cloned().unwrap_or_default();
305    let mut command = Command::new(program);
306    command.args(argv);
307    if let Some(cwd) = &spec.cwd {
308        command.current_dir(cwd);
309    }
310    command.envs(&spec.env);
311    command.stdin(if spec.stdin_text.is_some() {
312        Stdio::piped()
313    } else {
314        Stdio::null()
315    });
316    command
317}
318
319fn failed_output(spec: &CommandSpec, error: io::Error) -> std::process::Output {
320    let stderr = format!(
321        "failed to start {}: {}",
322        spec.argv.first().map(|s| s.as_str()).unwrap_or("(unknown)"),
323        error
324    )
325    .into_bytes();
326    std::process::Output {
327        status: exit_status_from_code(1),
328        stdout: Vec::new(),
329        stderr,
330    }
331}
332
333#[cfg(unix)]
334fn exit_status_from_code(code: i32) -> std::process::ExitStatus {
335    std::process::ExitStatus::from_raw(code << 8)
336}
337
338#[cfg(windows)]
339fn exit_status_from_code(code: i32) -> std::process::ExitStatus {
340    std::process::ExitStatus::from_raw(code as u32)
341}
342
343#[cfg(unix)]
344use std::os::unix::process::ExitStatusExt;
345
346#[cfg(windows)]
347use std::os::windows::process::ExitStatusExt;