ito_core/harness/
opencode.rs1use super::types::{Harness, HarnessName, HarnessRunConfig, HarnessRunResult};
2use miette::{Result, miette};
3use std::io::{BufRead, BufReader, Write};
4use std::process::{Command, Stdio};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::thread;
8use std::time::{Duration, Instant};
9
10pub const DEFAULT_INACTIVITY_TIMEOUT: Duration = Duration::from_secs(15 * 60);
12
13#[derive(Debug, Default)]
14pub struct OpencodeHarness;
19
20impl Harness for OpencodeHarness {
21 fn name(&self) -> HarnessName {
22 HarnessName::OPENCODE
23 }
24
25 fn run(&mut self, config: &HarnessRunConfig) -> Result<HarnessRunResult> {
26 let mut cmd = Command::new("opencode");
27 cmd.arg("run");
28
29 if let Some(model) = config.model.as_deref() {
30 cmd.args(["-m", model]);
31 }
32
33 cmd.arg(&config.prompt);
34 cmd.current_dir(&config.cwd);
35 cmd.envs(&config.env);
36
37 cmd.stdout(Stdio::piped());
39 cmd.stderr(Stdio::piped());
40
41 let start = Instant::now();
42
43 let mut child = cmd
44 .spawn()
45 .map_err(|e| miette!("Failed to spawn opencode: {e}"))?;
46
47 let child_id = child.id();
48 let stdout_pipe = child.stdout.take();
49 let stderr_pipe = child.stderr.take();
50
51 let last_activity = Arc::new(std::sync::Mutex::new(Instant::now()));
53 let timed_out = Arc::new(AtomicBool::new(false));
54 let done = Arc::new(AtomicBool::new(false));
55
56 let last_activity_stdout = Arc::clone(&last_activity);
58 let stdout_handle =
59 thread::spawn(move || stream_pipe(stdout_pipe, &last_activity_stdout, true));
60
61 let last_activity_stderr = Arc::clone(&last_activity);
63 let stderr_handle =
64 thread::spawn(move || stream_pipe(stderr_pipe, &last_activity_stderr, false));
65
66 let timeout = config
68 .inactivity_timeout
69 .unwrap_or(DEFAULT_INACTIVITY_TIMEOUT);
70 let last_activity_monitor = Arc::clone(&last_activity);
71 let timed_out_monitor = Arc::clone(&timed_out);
72 let done_monitor = Arc::clone(&done);
73
74 let monitor_handle = thread::spawn(move || {
75 monitor_timeout(
76 child_id,
77 timeout,
78 &last_activity_monitor,
79 &timed_out_monitor,
80 &done_monitor,
81 )
82 });
83
84 let status = child
86 .wait()
87 .map_err(|e| miette!("Failed to wait for opencode: {e}"))?;
88
89 done.store(true, Ordering::SeqCst);
90
91 let stdout = stdout_handle.join().unwrap_or_default();
93 let stderr = stderr_handle.join().unwrap_or_default();
94
95 let _ = monitor_handle.join();
97
98 let duration = start.elapsed();
99 let was_timed_out = timed_out.load(Ordering::SeqCst);
100
101 Ok(HarnessRunResult {
102 stdout,
103 stderr,
104 exit_code: if was_timed_out {
105 -1
106 } else {
107 status.code().unwrap_or(1)
108 },
109 duration,
110 timed_out: was_timed_out,
111 })
112 }
113
114 fn stop(&mut self) {
115 }
117
118 fn streams_output(&self) -> bool {
119 true
120 }
121}
122
123fn stream_pipe(
125 pipe: Option<impl std::io::Read>,
126 last_activity: &std::sync::Mutex<Instant>,
127 is_stdout: bool,
128) -> String {
129 let mut collected = String::new();
130 if let Some(pipe) = pipe {
131 let reader = BufReader::new(pipe);
132 for line in reader.lines().map_while(Result::ok) {
133 if let Ok(mut last) = last_activity.lock() {
135 *last = Instant::now();
136 }
137
138 if is_stdout {
140 println!("{}", line);
141 let _ = std::io::stdout().flush();
142 } else {
143 eprintln!("{}", line);
144 let _ = std::io::stderr().flush();
145 }
146
147 collected.push_str(&line);
148 collected.push('\n');
149 }
150 }
151 collected
152}
153
154fn monitor_timeout(
156 child_id: u32,
157 timeout: Duration,
158 last_activity: &std::sync::Mutex<Instant>,
159 timed_out: &AtomicBool,
160 done: &AtomicBool,
161) {
162 let check_interval = Duration::from_secs(1);
163
164 loop {
165 thread::sleep(check_interval);
166
167 if done.load(Ordering::SeqCst) {
168 break;
169 }
170
171 let elapsed = match last_activity.lock() {
173 Ok(last) => last.elapsed(),
174 Err(_) => break, };
176
177 if elapsed >= timeout {
178 eprintln!(
179 "\n=== Inactivity timeout ({:?}) reached, killing process... ===\n",
180 timeout
181 );
182 timed_out.store(true, Ordering::SeqCst);
183
184 #[cfg(unix)]
186 {
187 let _ = std::process::Command::new("kill")
188 .args(["-9", &child_id.to_string()])
189 .status();
190 }
191 #[cfg(windows)]
192 {
193 let _ = std::process::Command::new("taskkill")
194 .args(["/F", "/PID", &child_id.to_string()])
195 .status();
196 }
197
198 break;
199 }
200
201 }
204}