Skip to main content

vtcode_core/exec/
async_command.rs

1use hashbrown::HashMap;
2use std::ffi::OsString;
3use std::path::PathBuf;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use anyhow::{Context, Result, anyhow};
9use async_process::{Child, Command as AsyncCommand, ExitStatus, Stdio};
10
11use futures_lite::AsyncReadExt;
12use tokio::sync::Mutex;
13use tokio::time::{Sleep, sleep};
14use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
15
16use crate::telemetry::perf;
17use crate::utils::gatekeeper;
18
19const DEFAULT_CAPTURE_LIMIT: usize = 256 * 1024; // 256 KiB
20
21#[derive(Debug, Clone)]
22pub struct StreamCaptureConfig {
23    pub capture: bool,
24    pub max_bytes: usize,
25}
26
27impl Default for StreamCaptureConfig {
28    fn default() -> Self {
29        Self {
30            capture: true,
31            max_bytes: DEFAULT_CAPTURE_LIMIT,
32        }
33    }
34}
35
36#[derive(Debug, Clone, Default)]
37pub struct ProcessOptions {
38    pub program: String,
39    pub args: Vec<String>,
40    pub env: HashMap<OsString, OsString>,
41    pub current_dir: Option<PathBuf>,
42    pub timeout: Option<Duration>,
43    pub cancellation_token: Option<CancellationToken>,
44    pub stdout: StreamCaptureConfig,
45    pub stderr: StreamCaptureConfig,
46}
47
48#[derive(Debug)]
49pub struct ProcessOutput {
50    pub exit_status: ExitStatus,
51    pub stdout: Vec<u8>,
52    pub stderr: Vec<u8>,
53    pub timed_out: bool,
54    pub cancelled: bool,
55    pub duration: Duration,
56}
57
58pub struct AsyncProcessRunner;
59
60impl AsyncProcessRunner {
61    pub async fn run(options: ProcessOptions) -> Result<ProcessOutput> {
62        if options.program.is_empty() {
63            return Err(anyhow!("program cannot be empty"));
64        }
65
66        let mut tags = HashMap::new();
67        tags.insert("subsystem".to_string(), "async_command".to_string());
68        tags.insert("program".to_string(), options.program.clone());
69        perf::record_value("vtcode.perf.spawn_count", 1.0, tags);
70
71        gatekeeper::check_quarantine_for_program(&options.program);
72
73        let start = Instant::now();
74        let mut command = AsyncCommand::new(&options.program);
75        command.args(&options.args);
76        if let Some(dir) = &options.current_dir {
77            command.current_dir(dir);
78        }
79        if !options.env.is_empty() {
80            command.envs(&options.env);
81        }
82        command.stdout(Stdio::piped());
83        command.stderr(Stdio::piped());
84
85        let mut child = command.spawn().with_context(|| {
86            format!(
87                "failed to spawn '{}' with args {:?}",
88                options.program, options.args
89            )
90        })?;
91
92        let stdout_handle = child.stdout.take();
93        let stderr_handle = child.stderr.take();
94        let shared_child = Arc::new(Mutex::new(child));
95
96        let mut stdout_future = Box::pin(read_stream(stdout_handle, options.stdout));
97        let mut stderr_future = Box::pin(read_stream(stderr_handle, options.stderr));
98        let mut wait_future = Box::pin(wait_for_status(shared_child.clone()));
99        let mut timeout_future = options
100            .timeout
101            .map(|dur| Box::pin(sleep(dur)) as Pin<Box<Sleep>>);
102        let mut cancellation_future = options.cancellation_token.as_ref().map(|token| {
103            Box::pin(token.clone().cancelled_owned()) as Pin<Box<WaitForCancellationFutureOwned>>
104        });
105
106        enum Completion {
107            Finished,
108            TimedOut,
109            Cancelled,
110        }
111
112        let mut exit_status: Option<ExitStatus> = None;
113        let mut stdout_result: Option<Result<Vec<u8>>> = None;
114        let mut stderr_result: Option<Result<Vec<u8>>> = None;
115
116        let completion = loop {
117            tokio::select! {
118                res = &mut wait_future, if exit_status.is_none() => {
119                    exit_status = Some(res?);
120                    // Continue to drain streams
121                }
122                res = &mut stdout_future, if stdout_result.is_none() => {
123                    stdout_result = Some(res);
124                }
125                res = &mut stderr_future, if stderr_result.is_none() => {
126                    stderr_result = Some(res);
127                }
128                _ = async {
129                    if let Some(fut) = timeout_future.as_mut() {
130                        fut.as_mut().await;
131                    } else {
132                        futures::future::pending::<()>().await;
133                    }
134                }, if timeout_future.is_some() => {
135                    break Completion::TimedOut;
136                }
137                _ = async {
138                    if let Some(fut) = cancellation_future.as_mut() {
139                        fut.as_mut().await;
140                    } else {
141                        futures::future::pending::<()>().await;
142                    }
143                }, if cancellation_future.is_some() => {
144                    break Completion::Cancelled;
145                }
146            }
147
148            // Check if everything is done
149            if exit_status.is_some() && stdout_result.is_some() && stderr_result.is_some() {
150                break Completion::Finished;
151            }
152        };
153
154        let (timed_out, cancelled, status) = match completion {
155            Completion::Finished => {
156                let status = match exit_status {
157                    Some(status) => status,
158                    None => wait_future.await?,
159                };
160                (false, false, status)
161            }
162            Completion::TimedOut => {
163                kill_child(shared_child.clone()).await?;
164                let status = wait_future.await?;
165                (true, false, status)
166            }
167            Completion::Cancelled => {
168                kill_child(shared_child.clone()).await?;
169                let status = wait_future.await?;
170                (false, true, status)
171            }
172        };
173
174        // Ensure streams are fully read
175        let stdout = match stdout_result {
176            Some(Ok(data)) => data,
177            Some(Err(e)) => return Err(e),
178            None => stdout_future.await?,
179        };
180        let stderr = match stderr_result {
181            Some(Ok(data)) => data,
182            Some(Err(e)) => return Err(e),
183            None => stderr_future.await?,
184        };
185
186        Ok(ProcessOutput {
187            exit_status: status,
188            stdout,
189            stderr,
190            timed_out,
191            cancelled,
192            duration: start.elapsed(),
193        })
194    }
195}
196
197async fn read_stream<R>(reader: Option<R>, config: StreamCaptureConfig) -> Result<Vec<u8>>
198where
199    R: futures_lite::AsyncRead + Unpin,
200{
201    if !config.capture {
202        return Ok(Vec::new());
203    }
204
205    let mut reader = match reader {
206        Some(r) => r,
207        None => return Ok(Vec::new()),
208    };
209
210    let mut output = Vec::new();
211    let mut buffer = [0u8; 4096];
212    loop {
213        let read = reader.read(&mut buffer).await?;
214        if read == 0 {
215            break;
216        }
217        let remaining = config.max_bytes.saturating_sub(output.len());
218        if remaining > 0 {
219            let to_copy = remaining.min(read);
220            output.extend_from_slice(&buffer[..to_copy]);
221        }
222    }
223
224    Ok(output)
225}
226
227async fn wait_for_status(child: Arc<Mutex<Child>>) -> Result<ExitStatus> {
228    let mut guard = child.lock().await;
229    let status = guard.status().await?;
230    Ok(status)
231}
232
233async fn kill_child(child: Arc<Mutex<Child>>) -> Result<()> {
234    let mut guard = child.lock().await;
235    guard.kill()?;
236    Ok(())
237}