1use std::time::Duration;
2
3#[cfg(feature = "async")]
4use tokio::io::AsyncReadExt;
5#[cfg(feature = "async")]
6use tokio::process::Command;
7use tracing::{debug, warn};
8
9use crate::Claude;
10use crate::error::{Error, Result};
11
12#[derive(Debug, Clone)]
14pub struct CommandOutput {
15 pub stdout: String,
16 pub stderr: String,
17 pub exit_code: i32,
18 pub success: bool,
19}
20
21#[cfg(feature = "async")]
27pub async fn run_claude(claude: &Claude, args: Vec<String>) -> Result<CommandOutput> {
28 run_claude_with_retry(claude, args, None).await
29}
30
31#[cfg(feature = "async")]
33pub async fn run_claude_with_retry(
34 claude: &Claude,
35 args: Vec<String>,
36 retry_override: Option<&crate::retry::RetryPolicy>,
37) -> Result<CommandOutput> {
38 let policy = retry_override.or(claude.retry_policy.as_ref());
39
40 match policy {
41 Some(policy) => {
42 crate::retry::with_retry(policy, || run_claude_once(claude, args.clone())).await
43 }
44 None => run_claude_once(claude, args).await,
45 }
46}
47
48#[cfg(feature = "async")]
49async fn run_claude_once(claude: &Claude, args: Vec<String>) -> Result<CommandOutput> {
50 let mut command_args = Vec::new();
51
52 command_args.extend(claude.global_args.clone());
54
55 command_args.extend(args);
57
58 debug!(binary = %claude.binary.display(), args = ?command_args, "executing claude command");
59
60 let output = if let Some(timeout) = claude.timeout {
61 run_with_timeout(
62 &claude.binary,
63 &command_args,
64 &claude.env,
65 claude.working_dir.as_deref(),
66 timeout,
67 )
68 .await?
69 } else {
70 run_internal(
71 &claude.binary,
72 &command_args,
73 &claude.env,
74 claude.working_dir.as_deref(),
75 )
76 .await?
77 };
78
79 Ok(output)
80}
81
82#[cfg(feature = "async")]
84pub async fn run_claude_allow_exit_codes(
85 claude: &Claude,
86 args: Vec<String>,
87 allowed_codes: &[i32],
88) -> Result<CommandOutput> {
89 let output = run_claude(claude, args).await;
90
91 match output {
92 Err(Error::CommandFailed {
93 exit_code,
94 stdout,
95 stderr,
96 ..
97 }) if allowed_codes.contains(&exit_code) => Ok(CommandOutput {
98 stdout,
99 stderr,
100 exit_code,
101 success: false,
102 }),
103 other => other,
104 }
105}
106
107#[cfg(feature = "async")]
108async fn run_internal(
109 binary: &std::path::Path,
110 args: &[String],
111 env: &std::collections::HashMap<String, String>,
112 working_dir: Option<&std::path::Path>,
113) -> Result<CommandOutput> {
114 let mut cmd = Command::new(binary);
115 cmd.args(args);
116
117 cmd.stdin(std::process::Stdio::null());
119
120 cmd.env_remove("CLAUDECODE");
122 cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
123
124 if let Some(dir) = working_dir {
125 cmd.current_dir(dir);
126 }
127
128 for (key, value) in env {
129 cmd.env(key, value);
130 }
131
132 let output = cmd.output().await.map_err(|e| Error::Io {
133 message: format!("failed to spawn claude: {e}"),
134 source: e,
135 working_dir: working_dir.map(|p| p.to_path_buf()),
136 })?;
137
138 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
139 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
140 let exit_code = output.status.code().unwrap_or(-1);
141
142 if !output.status.success() {
143 return Err(Error::CommandFailed {
144 command: format!("{} {}", binary.display(), args.join(" ")),
145 exit_code,
146 stdout,
147 stderr,
148 working_dir: working_dir.map(|p| p.to_path_buf()),
149 });
150 }
151
152 Ok(CommandOutput {
153 stdout,
154 stderr,
155 exit_code,
156 success: true,
157 })
158}
159
160#[cfg(feature = "async")]
172async fn run_with_timeout(
173 binary: &std::path::Path,
174 args: &[String],
175 env: &std::collections::HashMap<String, String>,
176 working_dir: Option<&std::path::Path>,
177 timeout: Duration,
178) -> Result<CommandOutput> {
179 let mut cmd = Command::new(binary);
180 cmd.args(args);
181 cmd.stdin(std::process::Stdio::null());
182 cmd.stdout(std::process::Stdio::piped());
183 cmd.stderr(std::process::Stdio::piped());
184 cmd.env_remove("CLAUDECODE");
185 cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
186
187 if let Some(dir) = working_dir {
188 cmd.current_dir(dir);
189 }
190
191 for (key, value) in env {
192 cmd.env(key, value);
193 }
194
195 let mut child = cmd.spawn().map_err(|e| Error::Io {
196 message: format!("failed to spawn claude: {e}"),
197 source: e,
198 working_dir: working_dir.map(|p| p.to_path_buf()),
199 })?;
200
201 let mut stdout = child.stdout.take().expect("stdout was piped");
202 let mut stderr = child.stderr.take().expect("stderr was piped");
203
204 let wait_and_drain = async {
209 let (status, stdout_str, stderr_str) =
210 tokio::join!(child.wait(), drain(&mut stdout), drain(&mut stderr));
211 (status, stdout_str, stderr_str)
212 };
213
214 match tokio::time::timeout(timeout, wait_and_drain).await {
215 Ok((Ok(status), stdout, stderr)) => {
216 let exit_code = status.code().unwrap_or(-1);
217
218 if !status.success() {
219 return Err(Error::CommandFailed {
220 command: format!("{} {}", binary.display(), args.join(" ")),
221 exit_code,
222 stdout,
223 stderr,
224 working_dir: working_dir.map(|p| p.to_path_buf()),
225 });
226 }
227
228 Ok(CommandOutput {
229 stdout,
230 stderr,
231 exit_code,
232 success: true,
233 })
234 }
235 Ok((Err(e), _stdout, _stderr)) => Err(Error::Io {
236 message: "failed to wait for claude process".to_string(),
237 source: e,
238 working_dir: working_dir.map(|p| p.to_path_buf()),
239 }),
240 Err(_) => {
241 let _ = child.kill().await;
247 let drain_budget = Duration::from_millis(200);
248 let stdout_str = tokio::time::timeout(drain_budget, drain(&mut stdout))
249 .await
250 .unwrap_or_default();
251 let stderr_str = tokio::time::timeout(drain_budget, drain(&mut stderr))
252 .await
253 .unwrap_or_default();
254 if !stdout_str.is_empty() || !stderr_str.is_empty() {
255 warn!(
256 stdout = %stdout_str,
257 stderr = %stderr_str,
258 "partial output from timed-out process",
259 );
260 }
261 Err(Error::Timeout {
262 timeout_seconds: timeout.as_secs(),
263 })
264 }
265 }
266}
267
268#[cfg(feature = "async")]
269async fn drain<R: AsyncReadExt + Unpin>(reader: &mut R) -> String {
270 let mut buf = Vec::new();
271 let _ = reader.read_to_end(&mut buf).await;
272 String::from_utf8_lossy(&buf).into_owned()
273}
274
275#[cfg(feature = "sync")]
279pub fn run_claude_sync(claude: &Claude, args: Vec<String>) -> Result<CommandOutput> {
280 run_claude_with_retry_sync(claude, args, None)
281}
282
283#[cfg(feature = "sync")]
285pub fn run_claude_with_retry_sync(
286 claude: &Claude,
287 args: Vec<String>,
288 retry_override: Option<&crate::retry::RetryPolicy>,
289) -> Result<CommandOutput> {
290 let policy = retry_override.or(claude.retry_policy.as_ref());
291
292 match policy {
293 Some(policy) => {
294 crate::retry::with_retry_sync(policy, || run_claude_once_sync(claude, args.clone()))
295 }
296 None => run_claude_once_sync(claude, args),
297 }
298}
299
300#[cfg(feature = "sync")]
301fn run_claude_once_sync(claude: &Claude, args: Vec<String>) -> Result<CommandOutput> {
302 let mut command_args = Vec::new();
303 command_args.extend(claude.global_args.clone());
304 command_args.extend(args);
305
306 debug!(binary = %claude.binary.display(), args = ?command_args, "executing claude command (sync)");
307
308 if let Some(timeout) = claude.timeout {
309 run_with_timeout_sync(
310 &claude.binary,
311 &command_args,
312 &claude.env,
313 claude.working_dir.as_deref(),
314 timeout,
315 )
316 } else {
317 run_internal_sync(
318 &claude.binary,
319 &command_args,
320 &claude.env,
321 claude.working_dir.as_deref(),
322 )
323 }
324}
325
326#[cfg(feature = "sync")]
328pub fn run_claude_allow_exit_codes_sync(
329 claude: &Claude,
330 args: Vec<String>,
331 allowed_codes: &[i32],
332) -> Result<CommandOutput> {
333 match run_claude_sync(claude, args) {
334 Err(Error::CommandFailed {
335 exit_code,
336 stdout,
337 stderr,
338 ..
339 }) if allowed_codes.contains(&exit_code) => Ok(CommandOutput {
340 stdout,
341 stderr,
342 exit_code,
343 success: false,
344 }),
345 other => other,
346 }
347}
348
349#[cfg(feature = "sync")]
350fn run_internal_sync(
351 binary: &std::path::Path,
352 args: &[String],
353 env: &std::collections::HashMap<String, String>,
354 working_dir: Option<&std::path::Path>,
355) -> Result<CommandOutput> {
356 use std::process::{Command as StdCommand, Stdio};
357
358 let mut cmd = StdCommand::new(binary);
359 cmd.args(args);
360 cmd.stdin(Stdio::null());
361 cmd.env_remove("CLAUDECODE");
362 cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
363
364 if let Some(dir) = working_dir {
365 cmd.current_dir(dir);
366 }
367
368 for (key, value) in env {
369 cmd.env(key, value);
370 }
371
372 let output = cmd.output().map_err(|e| Error::Io {
373 message: format!("failed to spawn claude: {e}"),
374 source: e,
375 working_dir: working_dir.map(|p| p.to_path_buf()),
376 })?;
377
378 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
379 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
380 let exit_code = output.status.code().unwrap_or(-1);
381
382 if !output.status.success() {
383 return Err(Error::CommandFailed {
384 command: format!("{} {}", binary.display(), args.join(" ")),
385 exit_code,
386 stdout,
387 stderr,
388 working_dir: working_dir.map(|p| p.to_path_buf()),
389 });
390 }
391
392 Ok(CommandOutput {
393 stdout,
394 stderr,
395 exit_code,
396 success: true,
397 })
398}
399
400#[cfg(feature = "sync")]
407fn run_with_timeout_sync(
408 binary: &std::path::Path,
409 args: &[String],
410 env: &std::collections::HashMap<String, String>,
411 working_dir: Option<&std::path::Path>,
412 timeout: Duration,
413) -> Result<CommandOutput> {
414 use std::process::{Command as StdCommand, Stdio};
415 use std::thread;
416 use wait_timeout::ChildExt;
417
418 let mut cmd = StdCommand::new(binary);
419 cmd.args(args);
420 cmd.stdin(Stdio::null());
421 cmd.stdout(Stdio::piped());
422 cmd.stderr(Stdio::piped());
423 cmd.env_remove("CLAUDECODE");
424 cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
425
426 if let Some(dir) = working_dir {
427 cmd.current_dir(dir);
428 }
429
430 for (key, value) in env {
431 cmd.env(key, value);
432 }
433
434 let mut child = cmd.spawn().map_err(|e| Error::Io {
435 message: format!("failed to spawn claude: {e}"),
436 source: e,
437 working_dir: working_dir.map(|p| p.to_path_buf()),
438 })?;
439
440 let stdout = child.stdout.take().expect("stdout was piped");
445 let stderr = child.stderr.take().expect("stderr was piped");
446
447 let stdout_thread = thread::spawn(move || drain_sync(stdout));
448 let stderr_thread = thread::spawn(move || drain_sync(stderr));
449
450 match child.wait_timeout(timeout).map_err(|e| Error::Io {
451 message: "failed to wait for claude process".to_string(),
452 source: e,
453 working_dir: working_dir.map(|p| p.to_path_buf()),
454 })? {
455 Some(status) => {
456 let stdout = stdout_thread.join().unwrap_or_default();
457 let stderr = stderr_thread.join().unwrap_or_default();
458 let exit_code = status.code().unwrap_or(-1);
459
460 if !status.success() {
461 return Err(Error::CommandFailed {
462 command: format!("{} {}", binary.display(), args.join(" ")),
463 exit_code,
464 stdout,
465 stderr,
466 working_dir: working_dir.map(|p| p.to_path_buf()),
467 });
468 }
469
470 Ok(CommandOutput {
471 stdout,
472 stderr,
473 exit_code,
474 success: true,
475 })
476 }
477 None => {
478 let _ = child.kill();
483 let _ = child.wait();
484
485 let (stdout_str, stderr_str) =
486 join_with_deadline(stdout_thread, stderr_thread, Duration::from_millis(200));
487
488 if !stdout_str.is_empty() || !stderr_str.is_empty() {
489 warn!(
490 stdout = %stdout_str,
491 stderr = %stderr_str,
492 "partial output from timed-out process",
493 );
494 }
495
496 Err(Error::Timeout {
497 timeout_seconds: timeout.as_secs(),
498 })
499 }
500 }
501}
502
503#[cfg(feature = "sync")]
504fn drain_sync<R: std::io::Read>(mut reader: R) -> String {
505 let mut buf = Vec::new();
506 let _ = reader.read_to_end(&mut buf);
507 String::from_utf8_lossy(&buf).into_owned()
508}
509
510#[cfg(feature = "sync")]
516fn join_with_deadline(
517 stdout_thread: std::thread::JoinHandle<String>,
518 stderr_thread: std::thread::JoinHandle<String>,
519 budget: Duration,
520) -> (String, String) {
521 use std::sync::mpsc;
522 use std::thread;
523
524 let (tx, rx) = mpsc::channel::<(&'static str, String)>();
525
526 let tx_out = tx.clone();
527 let tx_err = tx;
528
529 thread::spawn(move || {
530 let s = stdout_thread.join().unwrap_or_default();
531 let _ = tx_out.send(("stdout", s));
532 });
533 thread::spawn(move || {
534 let s = stderr_thread.join().unwrap_or_default();
535 let _ = tx_err.send(("stderr", s));
536 });
537
538 let mut stdout = String::new();
539 let mut stderr = String::new();
540 let deadline = std::time::Instant::now() + budget;
541
542 for _ in 0..2 {
543 let now = std::time::Instant::now();
544 if now >= deadline {
545 break;
546 }
547 match rx.recv_timeout(deadline - now) {
548 Ok(("stdout", s)) => stdout = s,
549 Ok(("stderr", s)) => stderr = s,
550 Ok(_) => unreachable!(),
551 Err(_) => break,
552 }
553 }
554
555 (stdout, stderr)
556}