Skip to main content

claude_wrapper/
exec.rs

1use std::time::Duration;
2
3use tokio::io::AsyncReadExt;
4use tokio::process::Command;
5use tracing::{debug, warn};
6
7use crate::Claude;
8use crate::error::{Error, Result};
9
10/// Raw output from a claude CLI invocation.
11#[derive(Debug, Clone)]
12pub struct CommandOutput {
13    pub stdout: String,
14    pub stderr: String,
15    pub exit_code: i32,
16    pub success: bool,
17}
18
19/// Run a claude command with the given arguments.
20///
21/// If the [`Claude`] client has a retry policy set, transient errors will be
22/// retried according to that policy. A per-command retry policy can be passed
23/// to override the client default.
24pub async fn run_claude(claude: &Claude, args: Vec<String>) -> Result<CommandOutput> {
25    run_claude_with_retry(claude, args, None).await
26}
27
28/// Run a claude command with an optional per-command retry policy override.
29pub async fn run_claude_with_retry(
30    claude: &Claude,
31    args: Vec<String>,
32    retry_override: Option<&crate::retry::RetryPolicy>,
33) -> Result<CommandOutput> {
34    let policy = retry_override.or(claude.retry_policy.as_ref());
35
36    match policy {
37        Some(policy) => {
38            crate::retry::with_retry(policy, || run_claude_once(claude, args.clone())).await
39        }
40        None => run_claude_once(claude, args).await,
41    }
42}
43
44async fn run_claude_once(claude: &Claude, args: Vec<String>) -> Result<CommandOutput> {
45    let mut command_args = Vec::new();
46
47    // Global args first (before subcommand)
48    command_args.extend(claude.global_args.clone());
49
50    // Then command-specific args
51    command_args.extend(args);
52
53    debug!(binary = %claude.binary.display(), args = ?command_args, "executing claude command");
54
55    let output = if let Some(timeout) = claude.timeout {
56        run_with_timeout(
57            &claude.binary,
58            &command_args,
59            &claude.env,
60            claude.working_dir.as_deref(),
61            timeout,
62        )
63        .await?
64    } else {
65        run_internal(
66            &claude.binary,
67            &command_args,
68            &claude.env,
69            claude.working_dir.as_deref(),
70        )
71        .await?
72    };
73
74    Ok(output)
75}
76
77/// Run a claude command and allow specific non-zero exit codes.
78pub async fn run_claude_allow_exit_codes(
79    claude: &Claude,
80    args: Vec<String>,
81    allowed_codes: &[i32],
82) -> Result<CommandOutput> {
83    let output = run_claude(claude, args).await;
84
85    match output {
86        Err(Error::CommandFailed {
87            exit_code,
88            stdout,
89            stderr,
90            ..
91        }) if allowed_codes.contains(&exit_code) => Ok(CommandOutput {
92            stdout,
93            stderr,
94            exit_code,
95            success: false,
96        }),
97        other => other,
98    }
99}
100
101async fn run_internal(
102    binary: &std::path::Path,
103    args: &[String],
104    env: &std::collections::HashMap<String, String>,
105    working_dir: Option<&std::path::Path>,
106) -> Result<CommandOutput> {
107    let mut cmd = Command::new(binary);
108    cmd.args(args);
109
110    // Prevent child from inheriting/blocking on parent's stdin.
111    cmd.stdin(std::process::Stdio::null());
112
113    // Remove Claude Code env vars to prevent nested session detection
114    cmd.env_remove("CLAUDECODE");
115    cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
116
117    if let Some(dir) = working_dir {
118        cmd.current_dir(dir);
119    }
120
121    for (key, value) in env {
122        cmd.env(key, value);
123    }
124
125    let output = cmd.output().await.map_err(|e| Error::Io {
126        message: format!("failed to spawn claude: {e}"),
127        source: e,
128        working_dir: working_dir.map(|p| p.to_path_buf()),
129    })?;
130
131    let stdout = String::from_utf8_lossy(&output.stdout).to_string();
132    let stderr = String::from_utf8_lossy(&output.stderr).to_string();
133    let exit_code = output.status.code().unwrap_or(-1);
134
135    if !output.status.success() {
136        return Err(Error::CommandFailed {
137            command: format!("{} {}", binary.display(), args.join(" ")),
138            exit_code,
139            stdout,
140            stderr,
141            working_dir: working_dir.map(|p| p.to_path_buf()),
142        });
143    }
144
145    Ok(CommandOutput {
146        stdout,
147        stderr,
148        exit_code,
149        success: true,
150    })
151}
152
153/// Run a command with a timeout, killing and reaping the child on expiration.
154///
155/// Spawns the child explicitly (rather than wrapping `Command::output()` in a
156/// `tokio::time::timeout`) so that we retain the handle and can SIGKILL the
157/// child and wait for it when the timeout fires. Stdout and stderr are drained
158/// concurrently with `child.wait()` via `tokio::join!` so neither pipe buffer
159/// can fill up and deadlock the child.
160///
161/// On timeout, partial stdout/stderr captured before the kill is logged at
162/// warn level; the returned `Error::Timeout` itself does not carry the
163/// partial output.
164async fn run_with_timeout(
165    binary: &std::path::Path,
166    args: &[String],
167    env: &std::collections::HashMap<String, String>,
168    working_dir: Option<&std::path::Path>,
169    timeout: Duration,
170) -> Result<CommandOutput> {
171    let mut cmd = Command::new(binary);
172    cmd.args(args);
173    cmd.stdin(std::process::Stdio::null());
174    cmd.stdout(std::process::Stdio::piped());
175    cmd.stderr(std::process::Stdio::piped());
176    cmd.env_remove("CLAUDECODE");
177    cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
178
179    if let Some(dir) = working_dir {
180        cmd.current_dir(dir);
181    }
182
183    for (key, value) in env {
184        cmd.env(key, value);
185    }
186
187    let mut child = cmd.spawn().map_err(|e| Error::Io {
188        message: format!("failed to spawn claude: {e}"),
189        source: e,
190        working_dir: working_dir.map(|p| p.to_path_buf()),
191    })?;
192
193    let mut stdout = child.stdout.take().expect("stdout was piped");
194    let mut stderr = child.stderr.take().expect("stderr was piped");
195
196    // Drain stdout and stderr concurrently with the process wait so
197    // neither pipe buffer can fill up and deadlock the child.
198    // tokio::join! polls all three on the same task; no tokio::spawn
199    // (and therefore no `rt` feature) required.
200    let wait_and_drain = async {
201        let (status, stdout_str, stderr_str) =
202            tokio::join!(child.wait(), drain(&mut stdout), drain(&mut stderr));
203        (status, stdout_str, stderr_str)
204    };
205
206    match tokio::time::timeout(timeout, wait_and_drain).await {
207        Ok((Ok(status), stdout, stderr)) => {
208            let exit_code = status.code().unwrap_or(-1);
209
210            if !status.success() {
211                return Err(Error::CommandFailed {
212                    command: format!("{} {}", binary.display(), args.join(" ")),
213                    exit_code,
214                    stdout,
215                    stderr,
216                    working_dir: working_dir.map(|p| p.to_path_buf()),
217                });
218            }
219
220            Ok(CommandOutput {
221                stdout,
222                stderr,
223                exit_code,
224                success: true,
225            })
226        }
227        Ok((Err(e), _stdout, _stderr)) => Err(Error::Io {
228            message: "failed to wait for claude process".to_string(),
229            source: e,
230            working_dir: working_dir.map(|p| p.to_path_buf()),
231        }),
232        Err(_) => {
233            // Timeout: kill the child (reaps via start_kill + wait).
234            // Note that kill() only targets the direct child; if it has
235            // spawned its own subprocesses that are holding our pipe
236            // fds open, draining would block. Cap the drain with a
237            // short deadline so the timeout error returns promptly.
238            let _ = child.kill().await;
239            let drain_budget = Duration::from_millis(200);
240            let stdout_str = tokio::time::timeout(drain_budget, drain(&mut stdout))
241                .await
242                .unwrap_or_default();
243            let stderr_str = tokio::time::timeout(drain_budget, drain(&mut stderr))
244                .await
245                .unwrap_or_default();
246            if !stdout_str.is_empty() || !stderr_str.is_empty() {
247                warn!(
248                    stdout = %stdout_str,
249                    stderr = %stderr_str,
250                    "partial output from timed-out process",
251                );
252            }
253            Err(Error::Timeout {
254                timeout_seconds: timeout.as_secs(),
255            })
256        }
257    }
258}
259
260async fn drain<R: AsyncReadExt + Unpin>(reader: &mut R) -> String {
261    let mut buf = Vec::new();
262    let _ = reader.read_to_end(&mut buf).await;
263    String::from_utf8_lossy(&buf).into_owned()
264}