1use std::{
2 collections::BTreeMap,
3 io::{self, Write},
4 path::Path,
5 process::ExitStatus,
6 time::Duration,
7};
8
9use tokio::{
10 io::{AsyncRead, AsyncReadExt, AsyncWriteExt},
11 process::Command,
12 task, time,
13};
14
15use crate::ClaudeCodeError;
16
17const CLEANUP_GRACE: Duration = Duration::from_secs(2);
18
19async fn join_or_abort<T>(mut handle: tokio::task::JoinHandle<T>, grace: Duration) -> Option<T> {
20 if grace.is_zero() {
21 handle.abort();
22 let _ = handle.await;
23 return None;
24 }
25
26 tokio::select! {
27 output = &mut handle => output.ok(),
28 _ = time::sleep(grace) => {
29 handle.abort();
30 let _ = handle.await;
31 None
32 }
33 }
34}
35
36#[derive(Clone, Copy)]
37pub(crate) enum ConsoleTarget {
38 Stdout,
39 Stderr,
40}
41
42#[derive(Debug, Clone)]
43pub struct CommandOutput {
44 pub status: ExitStatus,
45 pub stdout: Vec<u8>,
46 pub stderr: Vec<u8>,
47}
48
49pub(crate) async fn tee_stream<R>(
50 mut reader: R,
51 target: ConsoleTarget,
52 mirror_console: bool,
53) -> Result<Vec<u8>, io::Error>
54where
55 R: AsyncRead + Unpin,
56{
57 let mut buffer = Vec::new();
58 let mut chunk = [0u8; 4096];
59 loop {
60 let n = reader.read(&mut chunk).await?;
61 if n == 0 {
62 break;
63 }
64 if mirror_console {
65 task::block_in_place(|| match target {
66 ConsoleTarget::Stdout => {
67 let mut out = io::stdout();
68 out.write_all(&chunk[..n])?;
69 out.flush()
70 }
71 ConsoleTarget::Stderr => {
72 let mut out = io::stderr();
73 out.write_all(&chunk[..n])?;
74 out.flush()
75 }
76 })?;
77 }
78 buffer.extend_from_slice(&chunk[..n]);
79 }
80 Ok(buffer)
81}
82
83pub(crate) fn spawn_with_retry(
84 command: &mut Command,
85 binary: &Path,
86) -> Result<tokio::process::Child, ClaudeCodeError> {
87 let mut backoff = Duration::from_millis(2);
88 for attempt in 0..5 {
89 match command.spawn() {
90 Ok(child) => return Ok(child),
91 Err(source) => {
92 let is_busy = matches!(source.kind(), std::io::ErrorKind::ExecutableFileBusy)
93 || source.raw_os_error() == Some(26);
94 if is_busy && attempt < 4 {
95 std::thread::sleep(backoff);
96 backoff = std::cmp::min(backoff * 2, Duration::from_millis(50));
97 continue;
98 }
99 return Err(ClaudeCodeError::Spawn {
100 binary: binary.to_path_buf(),
101 source,
102 });
103 }
104 }
105 }
106
107 unreachable!("spawn_with_retry should return before exhausting retries")
108}
109
110pub(crate) async fn run_command(
111 mut command: Command,
112 binary: &Path,
113 stdin_bytes: Option<&[u8]>,
114 timeout: Option<Duration>,
115 mirror_stdout: bool,
116 mirror_stderr: bool,
117) -> Result<CommandOutput, ClaudeCodeError> {
118 command.stdin(if stdin_bytes.is_some() {
119 std::process::Stdio::piped()
120 } else {
121 std::process::Stdio::null()
122 });
123 command.stdout(std::process::Stdio::piped());
124 command.stderr(std::process::Stdio::piped());
125 command.kill_on_drop(true);
126
127 let mut child = spawn_with_retry(&mut command, binary)?;
128
129 if let Some(bytes) = stdin_bytes {
130 if let Some(mut stdin) = child.stdin.take() {
131 stdin
132 .write_all(bytes)
133 .await
134 .map_err(ClaudeCodeError::StdinWrite)?;
135 }
136 }
137
138 let stdout = child.stdout.take().ok_or(ClaudeCodeError::MissingStdout)?;
139 let stderr = child.stderr.take().ok_or(ClaudeCodeError::MissingStderr)?;
140
141 let stdout_task = tokio::spawn(tee_stream(stdout, ConsoleTarget::Stdout, mirror_stdout));
142 let stderr_task = tokio::spawn(tee_stream(stderr, ConsoleTarget::Stderr, mirror_stderr));
143
144 let status = if let Some(dur) = timeout {
145 match time::timeout(dur, child.wait()).await {
146 Ok(Ok(status)) => status,
147 Ok(Err(source)) => {
148 let _ = child.start_kill();
149 let deadline = time::Instant::now() + CLEANUP_GRACE;
150 let remaining = deadline.saturating_duration_since(time::Instant::now());
151 let _ = time::timeout(remaining, child.wait()).await;
152 let remaining = deadline.saturating_duration_since(time::Instant::now());
153 let _ = join_or_abort(stdout_task, remaining).await;
154 let remaining = deadline.saturating_duration_since(time::Instant::now());
155 let _ = join_or_abort(stderr_task, remaining).await;
156 return Err(ClaudeCodeError::Wait(source));
157 }
158 Err(_) => {
159 let _ = child.start_kill();
160 let deadline = time::Instant::now() + CLEANUP_GRACE;
161 let remaining = deadline.saturating_duration_since(time::Instant::now());
162 let _ = time::timeout(remaining, child.wait()).await;
163 let remaining = deadline.saturating_duration_since(time::Instant::now());
164 let _ = join_or_abort(stdout_task, remaining).await;
165 let remaining = deadline.saturating_duration_since(time::Instant::now());
166 let _ = join_or_abort(stderr_task, remaining).await;
167 return Err(ClaudeCodeError::Timeout { timeout: dur });
168 }
169 }
170 } else {
171 match child.wait().await {
172 Ok(status) => status,
173 Err(source) => {
174 let _ = child.start_kill();
175 let deadline = time::Instant::now() + CLEANUP_GRACE;
176 let remaining = deadline.saturating_duration_since(time::Instant::now());
177 let _ = time::timeout(remaining, child.wait()).await;
178 let remaining = deadline.saturating_duration_since(time::Instant::now());
179 let _ = join_or_abort(stdout_task, remaining).await;
180 let remaining = deadline.saturating_duration_since(time::Instant::now());
181 let _ = join_or_abort(stderr_task, remaining).await;
182 return Err(ClaudeCodeError::Wait(source));
183 }
184 }
185 };
186
187 let stdout = stdout_task
188 .await
189 .map_err(|e| ClaudeCodeError::Join(e.to_string()))?
190 .map_err(ClaudeCodeError::StdoutRead)?;
191 let stderr = stderr_task
192 .await
193 .map_err(|e| ClaudeCodeError::Join(e.to_string()))?
194 .map_err(ClaudeCodeError::StderrRead)?;
195
196 Ok(CommandOutput {
197 status,
198 stdout,
199 stderr,
200 })
201}
202
203pub(crate) fn apply_env(command: &mut Command, env: &BTreeMap<String, String>) {
204 for (k, v) in env {
205 command.env(k, v);
206 }
207}