vtcode_core/exec/
async_command.rs1use hashbrown::HashMap;
2use std::ffi::OsString;
3use std::path::PathBuf;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use anyhow::{Context, Result, anyhow};
9use async_process::{Child, Command as AsyncCommand, ExitStatus, Stdio};
10
11use futures_lite::AsyncReadExt;
12use tokio::sync::Mutex;
13use tokio::time::{Sleep, sleep};
14use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
15
16use crate::telemetry::perf;
17use crate::utils::gatekeeper;
18
19const DEFAULT_CAPTURE_LIMIT: usize = 256 * 1024; #[derive(Debug, Clone)]
22pub struct StreamCaptureConfig {
23 pub capture: bool,
24 pub max_bytes: usize,
25}
26
27impl Default for StreamCaptureConfig {
28 fn default() -> Self {
29 Self {
30 capture: true,
31 max_bytes: DEFAULT_CAPTURE_LIMIT,
32 }
33 }
34}
35
36#[derive(Debug, Clone, Default)]
37pub struct ProcessOptions {
38 pub program: String,
39 pub args: Vec<String>,
40 pub env: HashMap<OsString, OsString>,
41 pub current_dir: Option<PathBuf>,
42 pub timeout: Option<Duration>,
43 pub cancellation_token: Option<CancellationToken>,
44 pub stdout: StreamCaptureConfig,
45 pub stderr: StreamCaptureConfig,
46}
47
48#[derive(Debug)]
49pub struct ProcessOutput {
50 pub exit_status: ExitStatus,
51 pub stdout: Vec<u8>,
52 pub stderr: Vec<u8>,
53 pub timed_out: bool,
54 pub cancelled: bool,
55 pub duration: Duration,
56}
57
58pub struct AsyncProcessRunner;
59
60impl AsyncProcessRunner {
61 pub async fn run(options: ProcessOptions) -> Result<ProcessOutput> {
62 if options.program.is_empty() {
63 return Err(anyhow!("program cannot be empty"));
64 }
65
66 let mut tags = HashMap::new();
67 tags.insert("subsystem".to_string(), "async_command".to_string());
68 tags.insert("program".to_string(), options.program.clone());
69 perf::record_value("vtcode.perf.spawn_count", 1.0, tags);
70
71 gatekeeper::check_quarantine_for_program(&options.program);
72
73 let start = Instant::now();
74 let mut command = AsyncCommand::new(&options.program);
75 command.args(&options.args);
76 if let Some(dir) = &options.current_dir {
77 command.current_dir(dir);
78 }
79 if !options.env.is_empty() {
80 command.envs(&options.env);
81 }
82 command.stdout(Stdio::piped());
83 command.stderr(Stdio::piped());
84
85 let mut child = command.spawn().with_context(|| {
86 format!(
87 "failed to spawn '{}' with args {:?}",
88 options.program, options.args
89 )
90 })?;
91
92 let stdout_handle = child.stdout.take();
93 let stderr_handle = child.stderr.take();
94 let shared_child = Arc::new(Mutex::new(child));
95
96 let mut stdout_future = Box::pin(read_stream(stdout_handle, options.stdout));
97 let mut stderr_future = Box::pin(read_stream(stderr_handle, options.stderr));
98 let mut wait_future = Box::pin(wait_for_status(shared_child.clone()));
99 let mut timeout_future = options
100 .timeout
101 .map(|dur| Box::pin(sleep(dur)) as Pin<Box<Sleep>>);
102 let mut cancellation_future = options.cancellation_token.as_ref().map(|token| {
103 Box::pin(token.clone().cancelled_owned()) as Pin<Box<WaitForCancellationFutureOwned>>
104 });
105
106 enum Completion {
107 Finished,
108 TimedOut,
109 Cancelled,
110 }
111
112 let mut exit_status: Option<ExitStatus> = None;
113 let mut stdout_result: Option<Result<Vec<u8>>> = None;
114 let mut stderr_result: Option<Result<Vec<u8>>> = None;
115
116 let completion = loop {
117 tokio::select! {
118 res = &mut wait_future, if exit_status.is_none() => {
119 exit_status = Some(res?);
120 }
122 res = &mut stdout_future, if stdout_result.is_none() => {
123 stdout_result = Some(res);
124 }
125 res = &mut stderr_future, if stderr_result.is_none() => {
126 stderr_result = Some(res);
127 }
128 _ = async {
129 if let Some(fut) = timeout_future.as_mut() {
130 fut.as_mut().await;
131 } else {
132 futures::future::pending::<()>().await;
133 }
134 }, if timeout_future.is_some() => {
135 break Completion::TimedOut;
136 }
137 _ = async {
138 if let Some(fut) = cancellation_future.as_mut() {
139 fut.as_mut().await;
140 } else {
141 futures::future::pending::<()>().await;
142 }
143 }, if cancellation_future.is_some() => {
144 break Completion::Cancelled;
145 }
146 }
147
148 if exit_status.is_some() && stdout_result.is_some() && stderr_result.is_some() {
150 break Completion::Finished;
151 }
152 };
153
154 let (timed_out, cancelled, status) = match completion {
155 Completion::Finished => {
156 let status = match exit_status {
157 Some(status) => status,
158 None => wait_future.await?,
159 };
160 (false, false, status)
161 }
162 Completion::TimedOut => {
163 kill_child(shared_child.clone()).await?;
164 let status = wait_future.await?;
165 (true, false, status)
166 }
167 Completion::Cancelled => {
168 kill_child(shared_child.clone()).await?;
169 let status = wait_future.await?;
170 (false, true, status)
171 }
172 };
173
174 let stdout = match stdout_result {
176 Some(Ok(data)) => data,
177 Some(Err(e)) => return Err(e),
178 None => stdout_future.await?,
179 };
180 let stderr = match stderr_result {
181 Some(Ok(data)) => data,
182 Some(Err(e)) => return Err(e),
183 None => stderr_future.await?,
184 };
185
186 Ok(ProcessOutput {
187 exit_status: status,
188 stdout,
189 stderr,
190 timed_out,
191 cancelled,
192 duration: start.elapsed(),
193 })
194 }
195}
196
197async fn read_stream<R>(reader: Option<R>, config: StreamCaptureConfig) -> Result<Vec<u8>>
198where
199 R: futures_lite::AsyncRead + Unpin,
200{
201 if !config.capture {
202 return Ok(Vec::new());
203 }
204
205 let mut reader = match reader {
206 Some(r) => r,
207 None => return Ok(Vec::new()),
208 };
209
210 let mut output = Vec::new();
211 let mut buffer = [0u8; 4096];
212 loop {
213 let read = reader.read(&mut buffer).await?;
214 if read == 0 {
215 break;
216 }
217 let remaining = config.max_bytes.saturating_sub(output.len());
218 if remaining > 0 {
219 let to_copy = remaining.min(read);
220 output.extend_from_slice(&buffer[..to_copy]);
221 }
222 }
223
224 Ok(output)
225}
226
227async fn wait_for_status(child: Arc<Mutex<Child>>) -> Result<ExitStatus> {
228 let mut guard = child.lock().await;
229 let status = guard.status().await?;
230 Ok(status)
231}
232
233async fn kill_child(child: Arc<Mutex<Child>>) -> Result<()> {
234 let mut guard = child.lock().await;
235 guard.kill()?;
236 Ok(())
237}