Skip to main content

ito_core/harness/
opencode.rs

1use super::types::{Harness, HarnessName, HarnessRunConfig, HarnessRunResult};
2use miette::{Result, miette};
3use std::io::{BufRead, BufReader, Write};
4use std::process::{Command, Stdio};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::thread;
8use std::time::{Duration, Instant};
9
10/// Default inactivity timeout: 15 minutes
11pub const DEFAULT_INACTIVITY_TIMEOUT: Duration = Duration::from_secs(15 * 60);
12
13#[derive(Debug, Default)]
14/// Harness implementation that executes the `opencode` CLI.
15///
16/// This harness is intended for local development and CI where OpenCode is
17/// available on `PATH`.
18pub struct OpencodeHarness;
19
20impl Harness for OpencodeHarness {
21    fn name(&self) -> HarnessName {
22        HarnessName::OPENCODE
23    }
24
25    fn run(&mut self, config: &HarnessRunConfig) -> Result<HarnessRunResult> {
26        let mut cmd = Command::new("opencode");
27        cmd.arg("run");
28
29        if let Some(model) = config.model.as_deref() {
30            cmd.args(["-m", model]);
31        }
32
33        cmd.arg(&config.prompt);
34        cmd.current_dir(&config.cwd);
35        cmd.envs(&config.env);
36
37        // Use spawn with piped stdout/stderr for streaming output
38        cmd.stdout(Stdio::piped());
39        cmd.stderr(Stdio::piped());
40
41        let start = Instant::now();
42
43        let mut child = cmd
44            .spawn()
45            .map_err(|e| miette!("Failed to spawn opencode: {e}"))?;
46
47        let child_id = child.id();
48        let stdout_pipe = child.stdout.take();
49        let stderr_pipe = child.stderr.take();
50
51        // Track last activity time for timeout detection
52        let last_activity = Arc::new(std::sync::Mutex::new(Instant::now()));
53        let timed_out = Arc::new(AtomicBool::new(false));
54        let done = Arc::new(AtomicBool::new(false));
55
56        // Spawn thread to stream stdout
57        let last_activity_stdout = Arc::clone(&last_activity);
58        let stdout_handle =
59            thread::spawn(move || stream_pipe(stdout_pipe, &last_activity_stdout, true));
60
61        // Spawn thread to stream stderr
62        let last_activity_stderr = Arc::clone(&last_activity);
63        let stderr_handle =
64            thread::spawn(move || stream_pipe(stderr_pipe, &last_activity_stderr, false));
65
66        // Spawn timeout monitor thread if timeout is configured
67        let timeout = config
68            .inactivity_timeout
69            .unwrap_or(DEFAULT_INACTIVITY_TIMEOUT);
70        let last_activity_monitor = Arc::clone(&last_activity);
71        let timed_out_monitor = Arc::clone(&timed_out);
72        let done_monitor = Arc::clone(&done);
73
74        let monitor_handle = thread::spawn(move || {
75            monitor_timeout(
76                child_id,
77                timeout,
78                &last_activity_monitor,
79                &timed_out_monitor,
80                &done_monitor,
81            )
82        });
83
84        // Wait for process to complete
85        let status = child
86            .wait()
87            .map_err(|e| miette!("Failed to wait for opencode: {e}"))?;
88
89        done.store(true, Ordering::SeqCst);
90
91        // Wait for streaming threads to finish
92        let stdout = stdout_handle.join().unwrap_or_default();
93        let stderr = stderr_handle.join().unwrap_or_default();
94
95        // Stop the monitor thread (it will exit on next check since process is done)
96        let _ = monitor_handle.join();
97
98        let duration = start.elapsed();
99        let was_timed_out = timed_out.load(Ordering::SeqCst);
100
101        Ok(HarnessRunResult {
102            stdout,
103            stderr,
104            exit_code: if was_timed_out {
105                -1
106            } else {
107                status.code().unwrap_or(1)
108            },
109            duration,
110            timed_out: was_timed_out,
111        })
112    }
113
114    fn stop(&mut self) {
115        // No-op: `run` is synchronous.
116    }
117
118    fn streams_output(&self) -> bool {
119        true
120    }
121}
122
123/// Stream output from a pipe, updating last activity time on each line.
124fn stream_pipe(
125    pipe: Option<impl std::io::Read>,
126    last_activity: &std::sync::Mutex<Instant>,
127    is_stdout: bool,
128) -> String {
129    let mut collected = String::new();
130    if let Some(pipe) = pipe {
131        let reader = BufReader::new(pipe);
132        for line in reader.lines().map_while(Result::ok) {
133            // Update last activity time
134            if let Ok(mut last) = last_activity.lock() {
135                *last = Instant::now();
136            }
137
138            // Stream to console
139            if is_stdout {
140                println!("{}", line);
141                let _ = std::io::stdout().flush();
142            } else {
143                eprintln!("{}", line);
144                let _ = std::io::stderr().flush();
145            }
146
147            collected.push_str(&line);
148            collected.push('\n');
149        }
150    }
151    collected
152}
153
154/// Monitor for inactivity timeout and kill process if exceeded.
155fn monitor_timeout(
156    child_id: u32,
157    timeout: Duration,
158    last_activity: &std::sync::Mutex<Instant>,
159    timed_out: &AtomicBool,
160    done: &AtomicBool,
161) {
162    let check_interval = Duration::from_secs(1);
163
164    loop {
165        thread::sleep(check_interval);
166
167        if done.load(Ordering::SeqCst) {
168            break;
169        }
170
171        // Check if process is still running by trying to get last activity
172        let elapsed = match last_activity.lock() {
173            Ok(last) => last.elapsed(),
174            Err(_) => break, // Mutex poisoned, process likely done
175        };
176
177        if elapsed >= timeout {
178            eprintln!(
179                "\n=== Inactivity timeout ({:?}) reached, killing process... ===\n",
180                timeout
181            );
182            timed_out.store(true, Ordering::SeqCst);
183
184            // Kill the process
185            #[cfg(unix)]
186            {
187                let _ = std::process::Command::new("kill")
188                    .args(["-9", &child_id.to_string()])
189                    .status();
190            }
191            #[cfg(windows)]
192            {
193                let _ = std::process::Command::new("taskkill")
194                    .args(["/F", "/PID", &child_id.to_string()])
195                    .status();
196            }
197
198            break;
199        }
200
201        // Check if process has exited (mutex would be poisoned or we'd be waiting forever)
202        // The streaming threads will exit when the process exits, which will close the pipes
203    }
204}