Skip to main content

claude_wrapper/
exec.rs

1use std::time::Duration;
2
3#[cfg(feature = "async")]
4use tokio::io::AsyncReadExt;
5#[cfg(feature = "async")]
6use tokio::process::Command;
7use tracing::{debug, warn};
8
9use crate::Claude;
10use crate::error::{Error, Result};
11
12/// Raw output from a claude CLI invocation.
13#[derive(Debug, Clone)]
14pub struct CommandOutput {
15    pub stdout: String,
16    pub stderr: String,
17    pub exit_code: i32,
18    pub success: bool,
19}
20
21/// Run a claude command with the given arguments.
22///
23/// If the [`Claude`] client has a retry policy set, transient errors will be
24/// retried according to that policy. A per-command retry policy can be passed
25/// to override the client default.
26#[cfg(feature = "async")]
27pub async fn run_claude(claude: &Claude, args: Vec<String>) -> Result<CommandOutput> {
28    run_claude_with_retry(claude, args, None).await
29}
30
31/// Run a claude command with an optional per-command retry policy override.
32#[cfg(feature = "async")]
33pub async fn run_claude_with_retry(
34    claude: &Claude,
35    args: Vec<String>,
36    retry_override: Option<&crate::retry::RetryPolicy>,
37) -> Result<CommandOutput> {
38    let policy = retry_override.or(claude.retry_policy.as_ref());
39
40    match policy {
41        Some(policy) => {
42            crate::retry::with_retry(policy, || run_claude_once(claude, args.clone())).await
43        }
44        None => run_claude_once(claude, args).await,
45    }
46}
47
48#[cfg(feature = "async")]
49async fn run_claude_once(claude: &Claude, args: Vec<String>) -> Result<CommandOutput> {
50    let mut command_args = Vec::new();
51
52    // Global args first (before subcommand)
53    command_args.extend(claude.global_args.clone());
54
55    // Then command-specific args
56    command_args.extend(args);
57
58    debug!(binary = %claude.binary.display(), args = ?command_args, "executing claude command");
59
60    let output = if let Some(timeout) = claude.timeout {
61        run_with_timeout(
62            &claude.binary,
63            &command_args,
64            &claude.env,
65            claude.working_dir.as_deref(),
66            timeout,
67        )
68        .await?
69    } else {
70        run_internal(
71            &claude.binary,
72            &command_args,
73            &claude.env,
74            claude.working_dir.as_deref(),
75        )
76        .await?
77    };
78
79    Ok(output)
80}
81
82/// Run a claude command and allow specific non-zero exit codes.
83#[cfg(feature = "async")]
84pub async fn run_claude_allow_exit_codes(
85    claude: &Claude,
86    args: Vec<String>,
87    allowed_codes: &[i32],
88) -> Result<CommandOutput> {
89    let output = run_claude(claude, args).await;
90
91    match output {
92        Err(Error::CommandFailed {
93            exit_code,
94            stdout,
95            stderr,
96            ..
97        }) if allowed_codes.contains(&exit_code) => Ok(CommandOutput {
98            stdout,
99            stderr,
100            exit_code,
101            success: false,
102        }),
103        other => other,
104    }
105}
106
107#[cfg(feature = "async")]
108async fn run_internal(
109    binary: &std::path::Path,
110    args: &[String],
111    env: &std::collections::HashMap<String, String>,
112    working_dir: Option<&std::path::Path>,
113) -> Result<CommandOutput> {
114    let mut cmd = Command::new(binary);
115    cmd.args(args);
116
117    // Prevent child from inheriting/blocking on parent's stdin.
118    cmd.stdin(std::process::Stdio::null());
119
120    // Remove Claude Code env vars to prevent nested session detection
121    cmd.env_remove("CLAUDECODE");
122    cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
123
124    if let Some(dir) = working_dir {
125        cmd.current_dir(dir);
126    }
127
128    for (key, value) in env {
129        cmd.env(key, value);
130    }
131
132    let output = cmd.output().await.map_err(|e| Error::Io {
133        message: format!("failed to spawn claude: {e}"),
134        source: e,
135        working_dir: working_dir.map(|p| p.to_path_buf()),
136    })?;
137
138    let stdout = String::from_utf8_lossy(&output.stdout).to_string();
139    let stderr = String::from_utf8_lossy(&output.stderr).to_string();
140    let exit_code = output.status.code().unwrap_or(-1);
141
142    if !output.status.success() {
143        return Err(Error::CommandFailed {
144            command: format!("{} {}", binary.display(), args.join(" ")),
145            exit_code,
146            stdout,
147            stderr,
148            working_dir: working_dir.map(|p| p.to_path_buf()),
149        });
150    }
151
152    Ok(CommandOutput {
153        stdout,
154        stderr,
155        exit_code,
156        success: true,
157    })
158}
159
160/// Run a command with a timeout, killing and reaping the child on expiration.
161///
162/// Spawns the child explicitly (rather than wrapping `Command::output()` in a
163/// `tokio::time::timeout`) so that we retain the handle and can SIGKILL the
164/// child and wait for it when the timeout fires. Stdout and stderr are drained
165/// concurrently with `child.wait()` via `tokio::join!` so neither pipe buffer
166/// can fill up and deadlock the child.
167///
168/// On timeout, partial stdout/stderr captured before the kill is logged at
169/// warn level; the returned `Error::Timeout` itself does not carry the
170/// partial output.
171#[cfg(feature = "async")]
172async fn run_with_timeout(
173    binary: &std::path::Path,
174    args: &[String],
175    env: &std::collections::HashMap<String, String>,
176    working_dir: Option<&std::path::Path>,
177    timeout: Duration,
178) -> Result<CommandOutput> {
179    let mut cmd = Command::new(binary);
180    cmd.args(args);
181    cmd.stdin(std::process::Stdio::null());
182    cmd.stdout(std::process::Stdio::piped());
183    cmd.stderr(std::process::Stdio::piped());
184    cmd.env_remove("CLAUDECODE");
185    cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
186
187    if let Some(dir) = working_dir {
188        cmd.current_dir(dir);
189    }
190
191    for (key, value) in env {
192        cmd.env(key, value);
193    }
194
195    let mut child = cmd.spawn().map_err(|e| Error::Io {
196        message: format!("failed to spawn claude: {e}"),
197        source: e,
198        working_dir: working_dir.map(|p| p.to_path_buf()),
199    })?;
200
201    let mut stdout = child.stdout.take().expect("stdout was piped");
202    let mut stderr = child.stderr.take().expect("stderr was piped");
203
204    // Drain stdout and stderr concurrently with the process wait so
205    // neither pipe buffer can fill up and deadlock the child.
206    // tokio::join! polls all three on the same task; no tokio::spawn
207    // (and therefore no `rt` feature) required.
208    let wait_and_drain = async {
209        let (status, stdout_str, stderr_str) =
210            tokio::join!(child.wait(), drain(&mut stdout), drain(&mut stderr));
211        (status, stdout_str, stderr_str)
212    };
213
214    match tokio::time::timeout(timeout, wait_and_drain).await {
215        Ok((Ok(status), stdout, stderr)) => {
216            let exit_code = status.code().unwrap_or(-1);
217
218            if !status.success() {
219                return Err(Error::CommandFailed {
220                    command: format!("{} {}", binary.display(), args.join(" ")),
221                    exit_code,
222                    stdout,
223                    stderr,
224                    working_dir: working_dir.map(|p| p.to_path_buf()),
225                });
226            }
227
228            Ok(CommandOutput {
229                stdout,
230                stderr,
231                exit_code,
232                success: true,
233            })
234        }
235        Ok((Err(e), _stdout, _stderr)) => Err(Error::Io {
236            message: "failed to wait for claude process".to_string(),
237            source: e,
238            working_dir: working_dir.map(|p| p.to_path_buf()),
239        }),
240        Err(_) => {
241            // Timeout: kill the child (reaps via start_kill + wait).
242            // Note that kill() only targets the direct child; if it has
243            // spawned its own subprocesses that are holding our pipe
244            // fds open, draining would block. Cap the drain with a
245            // short deadline so the timeout error returns promptly.
246            let _ = child.kill().await;
247            let drain_budget = Duration::from_millis(200);
248            let stdout_str = tokio::time::timeout(drain_budget, drain(&mut stdout))
249                .await
250                .unwrap_or_default();
251            let stderr_str = tokio::time::timeout(drain_budget, drain(&mut stderr))
252                .await
253                .unwrap_or_default();
254            if !stdout_str.is_empty() || !stderr_str.is_empty() {
255                warn!(
256                    stdout = %stdout_str,
257                    stderr = %stderr_str,
258                    "partial output from timed-out process",
259                );
260            }
261            Err(Error::Timeout {
262                timeout_seconds: timeout.as_secs(),
263            })
264        }
265    }
266}
267
268#[cfg(feature = "async")]
269async fn drain<R: AsyncReadExt + Unpin>(reader: &mut R) -> String {
270    let mut buf = Vec::new();
271    let _ = reader.read_to_end(&mut buf).await;
272    String::from_utf8_lossy(&buf).into_owned()
273}
274
275// ---------- sync twins ----------
276
277/// Blocking mirror of [`run_claude`]. Available with the `sync` feature.
278#[cfg(feature = "sync")]
279pub fn run_claude_sync(claude: &Claude, args: Vec<String>) -> Result<CommandOutput> {
280    run_claude_with_retry_sync(claude, args, None)
281}
282
283/// Blocking mirror of [`run_claude_with_retry`].
284#[cfg(feature = "sync")]
285pub fn run_claude_with_retry_sync(
286    claude: &Claude,
287    args: Vec<String>,
288    retry_override: Option<&crate::retry::RetryPolicy>,
289) -> Result<CommandOutput> {
290    let policy = retry_override.or(claude.retry_policy.as_ref());
291
292    match policy {
293        Some(policy) => {
294            crate::retry::with_retry_sync(policy, || run_claude_once_sync(claude, args.clone()))
295        }
296        None => run_claude_once_sync(claude, args),
297    }
298}
299
300#[cfg(feature = "sync")]
301fn run_claude_once_sync(claude: &Claude, args: Vec<String>) -> Result<CommandOutput> {
302    let mut command_args = Vec::new();
303    command_args.extend(claude.global_args.clone());
304    command_args.extend(args);
305
306    debug!(binary = %claude.binary.display(), args = ?command_args, "executing claude command (sync)");
307
308    if let Some(timeout) = claude.timeout {
309        run_with_timeout_sync(
310            &claude.binary,
311            &command_args,
312            &claude.env,
313            claude.working_dir.as_deref(),
314            timeout,
315        )
316    } else {
317        run_internal_sync(
318            &claude.binary,
319            &command_args,
320            &claude.env,
321            claude.working_dir.as_deref(),
322        )
323    }
324}
325
326/// Blocking mirror of [`run_claude_allow_exit_codes`].
327#[cfg(feature = "sync")]
328pub fn run_claude_allow_exit_codes_sync(
329    claude: &Claude,
330    args: Vec<String>,
331    allowed_codes: &[i32],
332) -> Result<CommandOutput> {
333    match run_claude_sync(claude, args) {
334        Err(Error::CommandFailed {
335            exit_code,
336            stdout,
337            stderr,
338            ..
339        }) if allowed_codes.contains(&exit_code) => Ok(CommandOutput {
340            stdout,
341            stderr,
342            exit_code,
343            success: false,
344        }),
345        other => other,
346    }
347}
348
349#[cfg(feature = "sync")]
350fn run_internal_sync(
351    binary: &std::path::Path,
352    args: &[String],
353    env: &std::collections::HashMap<String, String>,
354    working_dir: Option<&std::path::Path>,
355) -> Result<CommandOutput> {
356    use std::process::{Command as StdCommand, Stdio};
357
358    let mut cmd = StdCommand::new(binary);
359    cmd.args(args);
360    cmd.stdin(Stdio::null());
361    cmd.env_remove("CLAUDECODE");
362    cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
363
364    if let Some(dir) = working_dir {
365        cmd.current_dir(dir);
366    }
367
368    for (key, value) in env {
369        cmd.env(key, value);
370    }
371
372    let output = cmd.output().map_err(|e| Error::Io {
373        message: format!("failed to spawn claude: {e}"),
374        source: e,
375        working_dir: working_dir.map(|p| p.to_path_buf()),
376    })?;
377
378    let stdout = String::from_utf8_lossy(&output.stdout).to_string();
379    let stderr = String::from_utf8_lossy(&output.stderr).to_string();
380    let exit_code = output.status.code().unwrap_or(-1);
381
382    if !output.status.success() {
383        return Err(Error::CommandFailed {
384            command: format!("{} {}", binary.display(), args.join(" ")),
385            exit_code,
386            stdout,
387            stderr,
388            working_dir: working_dir.map(|p| p.to_path_buf()),
389        });
390    }
391
392    Ok(CommandOutput {
393        stdout,
394        stderr,
395        exit_code,
396        success: true,
397    })
398}
399
400/// Blocking run with a timeout. Mirrors [`run_with_timeout`]: spawns
401/// the child, drains stdout/stderr on dedicated threads so neither
402/// pipe buffer can fill up while we wait, then uses
403/// [`wait_timeout::ChildExt::wait_timeout`] to enforce the deadline.
404/// On timeout, the child is SIGKILLed and reaped; partial output is
405/// logged at warn but the returned [`Error::Timeout`] does not carry it.
406#[cfg(feature = "sync")]
407fn run_with_timeout_sync(
408    binary: &std::path::Path,
409    args: &[String],
410    env: &std::collections::HashMap<String, String>,
411    working_dir: Option<&std::path::Path>,
412    timeout: Duration,
413) -> Result<CommandOutput> {
414    use std::process::{Command as StdCommand, Stdio};
415    use std::thread;
416    use wait_timeout::ChildExt;
417
418    let mut cmd = StdCommand::new(binary);
419    cmd.args(args);
420    cmd.stdin(Stdio::null());
421    cmd.stdout(Stdio::piped());
422    cmd.stderr(Stdio::piped());
423    cmd.env_remove("CLAUDECODE");
424    cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
425
426    if let Some(dir) = working_dir {
427        cmd.current_dir(dir);
428    }
429
430    for (key, value) in env {
431        cmd.env(key, value);
432    }
433
434    let mut child = cmd.spawn().map_err(|e| Error::Io {
435        message: format!("failed to spawn claude: {e}"),
436        source: e,
437        working_dir: working_dir.map(|p| p.to_path_buf()),
438    })?;
439
440    // Detach stdout/stderr onto their own threads so neither can block
441    // the child by filling its pipe buffer. Each thread owns its half
442    // and drops it on completion, which closes the parent's fd and
443    // lets read_to_end() return EOF once the child exits.
444    let stdout = child.stdout.take().expect("stdout was piped");
445    let stderr = child.stderr.take().expect("stderr was piped");
446
447    let stdout_thread = thread::spawn(move || drain_sync(stdout));
448    let stderr_thread = thread::spawn(move || drain_sync(stderr));
449
450    match child.wait_timeout(timeout).map_err(|e| Error::Io {
451        message: "failed to wait for claude process".to_string(),
452        source: e,
453        working_dir: working_dir.map(|p| p.to_path_buf()),
454    })? {
455        Some(status) => {
456            let stdout = stdout_thread.join().unwrap_or_default();
457            let stderr = stderr_thread.join().unwrap_or_default();
458            let exit_code = status.code().unwrap_or(-1);
459
460            if !status.success() {
461                return Err(Error::CommandFailed {
462                    command: format!("{} {}", binary.display(), args.join(" ")),
463                    exit_code,
464                    stdout,
465                    stderr,
466                    working_dir: working_dir.map(|p| p.to_path_buf()),
467                });
468            }
469
470            Ok(CommandOutput {
471                stdout,
472                stderr,
473                exit_code,
474                success: true,
475            })
476        }
477        None => {
478            // Timeout: SIGKILL and reap. If the child has spawned
479            // subprocesses that inherited our pipe fds, the drain
480            // threads can block indefinitely; cap the join with a
481            // short budget so the timeout error returns promptly.
482            let _ = child.kill();
483            let _ = child.wait();
484
485            let (stdout_str, stderr_str) =
486                join_with_deadline(stdout_thread, stderr_thread, Duration::from_millis(200));
487
488            if !stdout_str.is_empty() || !stderr_str.is_empty() {
489                warn!(
490                    stdout = %stdout_str,
491                    stderr = %stderr_str,
492                    "partial output from timed-out process",
493                );
494            }
495
496            Err(Error::Timeout {
497                timeout_seconds: timeout.as_secs(),
498            })
499        }
500    }
501}
502
503#[cfg(feature = "sync")]
504fn drain_sync<R: std::io::Read>(mut reader: R) -> String {
505    let mut buf = Vec::new();
506    let _ = reader.read_to_end(&mut buf);
507    String::from_utf8_lossy(&buf).into_owned()
508}
509
510/// Wait for both drain threads to finish, returning "" for any that
511/// miss the deadline. Threads aren't cancellable in std; if the child's
512/// subprocesses are still holding a pipe fd open after kill(), the
513/// drain thread leaks. That's a pathological case; the common timeout
514/// path with a responsive child joins in microseconds.
515#[cfg(feature = "sync")]
516fn join_with_deadline(
517    stdout_thread: std::thread::JoinHandle<String>,
518    stderr_thread: std::thread::JoinHandle<String>,
519    budget: Duration,
520) -> (String, String) {
521    use std::sync::mpsc;
522    use std::thread;
523
524    let (tx, rx) = mpsc::channel::<(&'static str, String)>();
525
526    let tx_out = tx.clone();
527    let tx_err = tx;
528
529    thread::spawn(move || {
530        let s = stdout_thread.join().unwrap_or_default();
531        let _ = tx_out.send(("stdout", s));
532    });
533    thread::spawn(move || {
534        let s = stderr_thread.join().unwrap_or_default();
535        let _ = tx_err.send(("stderr", s));
536    });
537
538    let mut stdout = String::new();
539    let mut stderr = String::new();
540    let deadline = std::time::Instant::now() + budget;
541
542    for _ in 0..2 {
543        let now = std::time::Instant::now();
544        if now >= deadline {
545            break;
546        }
547        match rx.recv_timeout(deadline - now) {
548            Ok(("stdout", s)) => stdout = s,
549            Ok(("stderr", s)) => stderr = s,
550            Ok(_) => unreachable!(),
551            Err(_) => break,
552        }
553    }
554
555    (stdout, stderr)
556}