1use std::process::Stdio;
2use std::time::Duration;
3use tokio::io::AsyncWriteExt;
4use tokio::time::timeout;
5
6use crate::error::RunnerError;
7use crate::ring_buffer::RingBuffer;
8use crate::types::RunnerMode;
9
10use super::io::{PipeReadError, drain_pipes, read_pipes_until_exit};
11use super::platform;
12use super::{BufferConfig, ClaudeResponse, NdjsonResult, WslOptions};
13
14#[derive(Debug, Clone)]
16pub struct Runner {
17 pub mode: RunnerMode,
19 pub wsl_options: WslOptions,
21 pub buffer_config: BufferConfig,
23}
24
25impl Runner {
26 #[must_use]
28 pub fn new(mode: RunnerMode, wsl_options: WslOptions) -> Self {
29 Self {
30 mode,
31 wsl_options,
32 buffer_config: BufferConfig::default(),
33 }
34 }
35
36 #[must_use]
38 pub const fn with_buffer_config(
39 mode: RunnerMode,
40 wsl_options: WslOptions,
41 buffer_config: BufferConfig,
42 ) -> Self {
43 Self {
44 mode,
45 wsl_options,
46 buffer_config,
47 }
48 }
49
50 #[must_use]
62 pub fn parse_ndjson(stdout: &str) -> NdjsonResult {
63 crate::ndjson::parse_ndjson(stdout)
64 }
65
66 #[must_use]
68 #[allow(dead_code)] pub fn native() -> Self {
70 Self {
71 mode: RunnerMode::Native,
72 wsl_options: WslOptions {
73 distro: None,
74 claude_path: None,
75 },
76 buffer_config: BufferConfig::default(),
77 }
78 }
79
80 #[allow(dead_code)] pub fn auto() -> Result<Self, RunnerError> {
93 Ok(Self {
94 mode: RunnerMode::Auto,
95 wsl_options: WslOptions::default(),
96 buffer_config: BufferConfig::default(),
97 })
98 }
99
100 pub async fn execute_claude(
105 &self,
106 args: &[String],
107 stdin_content: &str,
108 timeout_duration: Option<Duration>,
109 ) -> Result<ClaudeResponse, RunnerError> {
110 let actual_mode = match self.mode {
112 RunnerMode::Auto => Self::detect_auto()?,
113 mode => mode,
114 };
115
116 match actual_mode {
118 RunnerMode::Native | RunnerMode::Auto => {
119 self.execute_native(args, stdin_content, timeout_duration)
120 .await
121 }
122 RunnerMode::Wsl => {
123 self.execute_wsl(args, stdin_content, timeout_duration)
124 .await
125 }
126 }
127 }
128
129 async fn execute_native(
131 &self,
132 args: &[String],
133 stdin_content: &str,
134 timeout_duration: Option<Duration>,
135 ) -> Result<ClaudeResponse, RunnerError> {
136 #[allow(unused_mut)]
137 let mut cmd = self.native_command_spec(args).to_tokio_command();
138
139 #[cfg(unix)]
141 {
142 #[allow(unused_imports)]
143 use std::os::unix::process::CommandExt;
144 unsafe {
145 cmd.pre_exec(|| {
146 libc::setpgid(0, 0);
148 Ok(())
149 });
150 }
151 }
152
153 self.execute_with_command(
154 cmd,
155 RunnerMode::Native,
156 "claude",
157 stdin_content,
158 timeout_duration,
159 )
160 .await
161 }
162
163 async fn execute_wsl(
165 &self,
166 args: &[String],
167 stdin_content: &str,
168 timeout_duration: Option<Duration>,
169 ) -> Result<ClaudeResponse, RunnerError> {
170 let cmd = self.wsl_command_spec(args).to_tokio_command();
171
172 let mut response = self
173 .execute_with_command(cmd, RunnerMode::Wsl, "wsl", stdin_content, timeout_duration)
174 .await?;
175 response.runner_distro = self.get_wsl_distro_name();
176 Ok(response)
177 }
178
179 async fn execute_with_command(
180 &self,
181 mut cmd: tokio::process::Command,
182 runner_used: RunnerMode,
183 label: &str,
184 stdin_content: &str,
185 timeout_duration: Option<Duration>,
186 ) -> Result<ClaudeResponse, RunnerError> {
187 cmd.stdin(Stdio::piped())
188 .stdout(Stdio::piped())
189 .stderr(Stdio::piped());
190
191 #[cfg(windows)]
193 let job = platform::create_job_object()?;
194
195 let mut child = cmd.spawn().map_err(|e| {
196 execution_failed(runner_used, format!("Failed to spawn {label} process: {e}"))
197 })?;
198
199 #[cfg(windows)]
201 platform::assign_to_job(&job, &child)?;
202
203 if let Some(mut stdin) = child.stdin.take() {
205 stdin
206 .write_all(stdin_content.as_bytes())
207 .await
208 .map_err(|e| {
209 execution_failed(
210 runner_used,
211 format!("Failed to write to {label} stdin: {e}"),
212 )
213 })?;
214 drop(stdin); }
216
217 let mut stdout_pipe = child
219 .stdout
220 .take()
221 .ok_or_else(|| execution_failed(runner_used, "Failed to capture stdout".to_string()))?;
222 let mut stderr_pipe = child
223 .stderr
224 .take()
225 .ok_or_else(|| execution_failed(runner_used, "Failed to capture stderr".to_string()))?;
226
227 let mut stdout_buffer = RingBuffer::new(self.buffer_config.stdout_cap_bytes);
229 let mut stderr_buffer = RingBuffer::new(self.buffer_config.stderr_cap_bytes);
230
231 let status = if let Some(duration) = timeout_duration {
232 let child_id = child.id();
234
235 let read_future = read_pipes_until_exit(
236 &mut child,
237 &mut stdout_pipe,
238 &mut stderr_pipe,
239 &mut stdout_buffer,
240 &mut stderr_buffer,
241 );
242
243 match timeout(duration, read_future).await {
244 Ok(result) => result.map_err(|err| map_pipe_error(runner_used, err))?,
245 Err(_) => {
246 if let Some(pid) = child_id {
248 platform::terminate_process_by_pid(pid, duration).await?;
249 }
250
251 let _ = drain_pipes(
253 &mut stdout_pipe,
254 &mut stderr_pipe,
255 &mut stdout_buffer,
256 &mut stderr_buffer,
257 )
258 .await;
259
260 return Err(RunnerError::Timeout {
262 timeout_seconds: duration.as_secs(),
263 });
264 }
265 }
266 } else {
267 read_pipes_until_exit(
268 &mut child,
269 &mut stdout_pipe,
270 &mut stderr_pipe,
271 &mut stdout_buffer,
272 &mut stderr_buffer,
273 )
274 .await
275 .map_err(|err| map_pipe_error(runner_used, err))?
276 };
277
278 let stdout = stdout_buffer.to_string();
279 let stderr = stderr_buffer.to_string();
280 let ndjson_result = Self::parse_ndjson(&stdout);
281
282 Ok(ClaudeResponse {
283 stdout,
284 stderr,
285 exit_code: status.code().unwrap_or(-1),
286 runner_used,
287 runner_distro: None,
288 timed_out: false,
289 ndjson_result,
290 stdout_truncated: stdout_buffer.was_truncated(),
291 stderr_truncated: stderr_buffer.was_truncated(),
292 stdout_total_bytes: stdout_buffer.total_bytes_written(),
293 stderr_total_bytes: stderr_buffer.total_bytes_written(),
294 })
295 }
296}
297
298impl Default for Runner {
299 fn default() -> Self {
300 Self {
301 mode: RunnerMode::Auto,
302 wsl_options: WslOptions::default(),
303 buffer_config: BufferConfig::default(),
304 }
305 }
306}
307
308fn execution_failed(runner_used: RunnerMode, reason: String) -> RunnerError {
309 match runner_used {
310 RunnerMode::Native => RunnerError::NativeExecutionFailed { reason },
311 RunnerMode::Wsl => RunnerError::WslExecutionFailed { reason },
312 RunnerMode::Auto => RunnerError::NativeExecutionFailed { reason },
313 }
314}
315
316fn map_pipe_error(runner_used: RunnerMode, error: PipeReadError) -> RunnerError {
317 match error {
318 PipeReadError::Stdout(err) => {
319 execution_failed(runner_used, format!("Failed to read stdout: {err}"))
320 }
321 PipeReadError::Stderr(err) => {
322 execution_failed(runner_used, format!("Failed to read stderr: {err}"))
323 }
324 PipeReadError::Wait(err) => {
325 execution_failed(runner_used, format!("Failed to wait for process: {err}"))
326 }
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::Runner;
333 use crate::claude::BufferConfig;
334 use crate::claude::WslOptions;
335 use crate::types::RunnerMode;
336
337 #[test]
338 fn test_runner_creation() {
339 let runner = Runner::new(RunnerMode::Native, WslOptions::default());
340 assert_eq!(runner.mode, RunnerMode::Native);
341 }
342
343 #[test]
344 fn test_runner_default() {
345 let runner = Runner::default();
346 assert_eq!(runner.mode, RunnerMode::Auto);
347 }
348
349 #[test]
350 fn test_runner_with_buffer_config() {
351 let buffer_config = BufferConfig {
352 stdout_cap_bytes: 1024,
353 stderr_cap_bytes: 512,
354 stderr_receipt_cap_bytes: 256,
355 };
356 let runner =
357 Runner::with_buffer_config(RunnerMode::Native, WslOptions::default(), buffer_config);
358 assert_eq!(runner.buffer_config.stdout_cap_bytes, 1024);
359 assert_eq!(runner.buffer_config.stderr_cap_bytes, 512);
360 assert_eq!(runner.buffer_config.stderr_receipt_cap_bytes, 256);
361 }
362}