Skip to main content

claude_code/
process.rs

1use std::{
2    collections::BTreeMap,
3    io::{self, Write},
4    path::Path,
5    process::ExitStatus,
6    time::Duration,
7};
8
9use tokio::{
10    io::{AsyncRead, AsyncReadExt, AsyncWriteExt},
11    process::Command,
12    task, time,
13};
14
15use crate::ClaudeCodeError;
16
17const CLEANUP_GRACE: Duration = Duration::from_secs(2);
18
19async fn join_or_abort<T>(mut handle: tokio::task::JoinHandle<T>, grace: Duration) -> Option<T> {
20    if grace.is_zero() {
21        handle.abort();
22        let _ = handle.await;
23        return None;
24    }
25
26    tokio::select! {
27        output = &mut handle => output.ok(),
28        _ = time::sleep(grace) => {
29            handle.abort();
30            let _ = handle.await;
31            None
32        }
33    }
34}
35
36#[derive(Clone, Copy)]
37pub(crate) enum ConsoleTarget {
38    Stdout,
39    Stderr,
40}
41
42#[derive(Debug, Clone)]
43pub struct CommandOutput {
44    pub status: ExitStatus,
45    pub stdout: Vec<u8>,
46    pub stderr: Vec<u8>,
47}
48
49pub(crate) async fn tee_stream<R>(
50    mut reader: R,
51    target: ConsoleTarget,
52    mirror_console: bool,
53) -> Result<Vec<u8>, io::Error>
54where
55    R: AsyncRead + Unpin,
56{
57    let mut buffer = Vec::new();
58    let mut chunk = [0u8; 4096];
59    loop {
60        let n = reader.read(&mut chunk).await?;
61        if n == 0 {
62            break;
63        }
64        if mirror_console {
65            task::block_in_place(|| match target {
66                ConsoleTarget::Stdout => {
67                    let mut out = io::stdout();
68                    out.write_all(&chunk[..n])?;
69                    out.flush()
70                }
71                ConsoleTarget::Stderr => {
72                    let mut out = io::stderr();
73                    out.write_all(&chunk[..n])?;
74                    out.flush()
75                }
76            })?;
77        }
78        buffer.extend_from_slice(&chunk[..n]);
79    }
80    Ok(buffer)
81}
82
83pub(crate) fn spawn_with_retry(
84    command: &mut Command,
85    binary: &Path,
86) -> Result<tokio::process::Child, ClaudeCodeError> {
87    let mut backoff = Duration::from_millis(2);
88    for attempt in 0..5 {
89        match command.spawn() {
90            Ok(child) => return Ok(child),
91            Err(source) => {
92                let is_busy = matches!(source.kind(), std::io::ErrorKind::ExecutableFileBusy)
93                    || source.raw_os_error() == Some(26);
94                if is_busy && attempt < 4 {
95                    std::thread::sleep(backoff);
96                    backoff = std::cmp::min(backoff * 2, Duration::from_millis(50));
97                    continue;
98                }
99                return Err(ClaudeCodeError::Spawn {
100                    binary: binary.to_path_buf(),
101                    source,
102                });
103            }
104        }
105    }
106
107    unreachable!("spawn_with_retry should return before exhausting retries")
108}
109
110pub(crate) async fn run_command(
111    mut command: Command,
112    binary: &Path,
113    stdin_bytes: Option<&[u8]>,
114    timeout: Option<Duration>,
115    mirror_stdout: bool,
116    mirror_stderr: bool,
117) -> Result<CommandOutput, ClaudeCodeError> {
118    command.stdin(if stdin_bytes.is_some() {
119        std::process::Stdio::piped()
120    } else {
121        std::process::Stdio::null()
122    });
123    command.stdout(std::process::Stdio::piped());
124    command.stderr(std::process::Stdio::piped());
125    command.kill_on_drop(true);
126
127    let mut child = spawn_with_retry(&mut command, binary)?;
128
129    if let Some(bytes) = stdin_bytes {
130        if let Some(mut stdin) = child.stdin.take() {
131            stdin
132                .write_all(bytes)
133                .await
134                .map_err(ClaudeCodeError::StdinWrite)?;
135        }
136    }
137
138    let stdout = child.stdout.take().ok_or(ClaudeCodeError::MissingStdout)?;
139    let stderr = child.stderr.take().ok_or(ClaudeCodeError::MissingStderr)?;
140
141    let stdout_task = tokio::spawn(tee_stream(stdout, ConsoleTarget::Stdout, mirror_stdout));
142    let stderr_task = tokio::spawn(tee_stream(stderr, ConsoleTarget::Stderr, mirror_stderr));
143
144    let status = if let Some(dur) = timeout {
145        match time::timeout(dur, child.wait()).await {
146            Ok(Ok(status)) => status,
147            Ok(Err(source)) => {
148                let _ = child.start_kill();
149                let deadline = time::Instant::now() + CLEANUP_GRACE;
150                let remaining = deadline.saturating_duration_since(time::Instant::now());
151                let _ = time::timeout(remaining, child.wait()).await;
152                let remaining = deadline.saturating_duration_since(time::Instant::now());
153                let _ = join_or_abort(stdout_task, remaining).await;
154                let remaining = deadline.saturating_duration_since(time::Instant::now());
155                let _ = join_or_abort(stderr_task, remaining).await;
156                return Err(ClaudeCodeError::Wait(source));
157            }
158            Err(_) => {
159                let _ = child.start_kill();
160                let deadline = time::Instant::now() + CLEANUP_GRACE;
161                let remaining = deadline.saturating_duration_since(time::Instant::now());
162                let _ = time::timeout(remaining, child.wait()).await;
163                let remaining = deadline.saturating_duration_since(time::Instant::now());
164                let _ = join_or_abort(stdout_task, remaining).await;
165                let remaining = deadline.saturating_duration_since(time::Instant::now());
166                let _ = join_or_abort(stderr_task, remaining).await;
167                return Err(ClaudeCodeError::Timeout { timeout: dur });
168            }
169        }
170    } else {
171        match child.wait().await {
172            Ok(status) => status,
173            Err(source) => {
174                let _ = child.start_kill();
175                let deadline = time::Instant::now() + CLEANUP_GRACE;
176                let remaining = deadline.saturating_duration_since(time::Instant::now());
177                let _ = time::timeout(remaining, child.wait()).await;
178                let remaining = deadline.saturating_duration_since(time::Instant::now());
179                let _ = join_or_abort(stdout_task, remaining).await;
180                let remaining = deadline.saturating_duration_since(time::Instant::now());
181                let _ = join_or_abort(stderr_task, remaining).await;
182                return Err(ClaudeCodeError::Wait(source));
183            }
184        }
185    };
186
187    let stdout = stdout_task
188        .await
189        .map_err(|e| ClaudeCodeError::Join(e.to_string()))?
190        .map_err(ClaudeCodeError::StdoutRead)?;
191    let stderr = stderr_task
192        .await
193        .map_err(|e| ClaudeCodeError::Join(e.to_string()))?
194        .map_err(ClaudeCodeError::StderrRead)?;
195
196    Ok(CommandOutput {
197        status,
198        stdout,
199        stderr,
200    })
201}
202
203pub(crate) fn apply_env(command: &mut Command, env: &BTreeMap<String, String>) {
204    for (k, v) in env {
205        command.env(k, v);
206    }
207}