agcodex_core/
exec.rs

1#[cfg(unix)]
2use std::os::unix::process::ExitStatusExt;
3
4use std::collections::HashMap;
5use std::io;
6use std::path::PathBuf;
7use std::process::ExitStatus;
8use std::time::Duration;
9use std::time::Instant;
10
11use async_channel::Sender;
12use tokio::io::AsyncRead;
13use tokio::io::AsyncReadExt;
14use tokio::io::BufReader;
15use tokio::process::Child;
16
17use crate::error::CodexErr;
18use crate::error::Result;
19use crate::error::SandboxErr;
20use crate::landlock::spawn_command_under_linux_sandbox;
21use crate::modes::ModeRestrictions;
22use crate::protocol::Event;
23use crate::protocol::EventMsg;
24use crate::protocol::ExecCommandOutputDeltaEvent;
25use crate::protocol::ExecOutputStream;
26use crate::protocol::SandboxPolicy;
27use crate::seatbelt::spawn_command_under_seatbelt;
28use crate::spawn::StdioPolicy;
29use crate::spawn::spawn_child_async;
30use serde_bytes::ByteBuf;
31
32// Maximum we send for each stream, which is either:
33// - 10KiB OR
34// - 256 lines
35const MAX_STREAM_OUTPUT: usize = 10 * 1024;
36const MAX_STREAM_OUTPUT_LINES: usize = 256;
37
38const DEFAULT_TIMEOUT_MS: u64 = 10_000;
39
40// Hardcode these since it does not seem worth including the libc crate just
41// for these.
42const SIGKILL_CODE: i32 = 9;
43const TIMEOUT_CODE: i32 = 64;
44
45#[derive(Debug, Clone)]
46pub struct ExecParams {
47    pub command: Vec<String>,
48    pub cwd: PathBuf,
49    pub timeout_ms: Option<u64>,
50    pub env: HashMap<String, String>,
51    pub with_escalated_permissions: Option<bool>,
52    pub justification: Option<String>,
53}
54
55impl ExecParams {
56    pub fn timeout_duration(&self) -> Duration {
57        Duration::from_millis(self.timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS))
58    }
59}
60
61#[derive(Clone, Copy, Debug, PartialEq)]
62pub enum SandboxType {
63    None,
64
65    /// Only available on macOS.
66    MacosSeatbelt,
67
68    /// Only available on Linux.
69    LinuxSeccomp,
70}
71
72#[derive(Clone)]
73pub struct StdoutStream {
74    pub sub_id: String,
75    pub call_id: String,
76    pub tx_event: Sender<Event>,
77}
78
79pub async fn process_exec_tool_call(
80    params: ExecParams,
81    sandbox_type: SandboxType,
82    sandbox_policy: &SandboxPolicy,
83    codex_linux_sandbox_exe: &Option<PathBuf>,
84    stdout_stream: Option<StdoutStream>,
85    mode_restrictions: &ModeRestrictions,
86) -> Result<ExecToolCallOutput> {
87    // Check mode restrictions before executing commands
88    if !mode_restrictions.allow_command_exec {
89        return Err(CodexErr::ModeRestriction(
90            "Command execution not allowed in current mode".to_string(),
91        ));
92    }
93
94    let start = Instant::now();
95
96    let raw_output_result: std::result::Result<RawExecToolCallOutput, CodexErr> = match sandbox_type
97    {
98        SandboxType::None => exec(params, sandbox_policy, stdout_stream.clone()).await,
99        SandboxType::MacosSeatbelt => {
100            let timeout = params.timeout_duration();
101            let ExecParams {
102                command, cwd, env, ..
103            } = params;
104            let child = spawn_command_under_seatbelt(
105                command,
106                sandbox_policy,
107                cwd,
108                StdioPolicy::RedirectForShellTool,
109                env,
110            )
111            .await?;
112            consume_truncated_output(child, timeout, stdout_stream.clone()).await
113        }
114        SandboxType::LinuxSeccomp => {
115            let timeout = params.timeout_duration();
116            let ExecParams {
117                command, cwd, env, ..
118            } = params;
119
120            let codex_linux_sandbox_exe = codex_linux_sandbox_exe
121                .as_ref()
122                .ok_or(CodexErr::LandlockSandboxExecutableNotProvided)?;
123            let child = spawn_command_under_linux_sandbox(
124                codex_linux_sandbox_exe,
125                command,
126                sandbox_policy,
127                cwd,
128                StdioPolicy::RedirectForShellTool,
129                env,
130            )
131            .await?;
132
133            consume_truncated_output(child, timeout, stdout_stream).await
134        }
135    };
136    let duration = start.elapsed();
137    match raw_output_result {
138        Ok(raw_output) => {
139            let stdout = raw_output.stdout.from_utf8_lossy();
140            let stderr = raw_output.stderr.from_utf8_lossy();
141
142            #[cfg(target_family = "unix")]
143            match raw_output.exit_status.signal() {
144                Some(TIMEOUT_CODE) => return Err(CodexErr::Sandbox(SandboxErr::Timeout)),
145                Some(signal) => {
146                    return Err(CodexErr::Sandbox(SandboxErr::Signal(signal)));
147                }
148                None => {}
149            }
150
151            let exit_code = raw_output.exit_status.code().unwrap_or(-1);
152
153            if exit_code != 0 && is_likely_sandbox_denied(sandbox_type, exit_code) {
154                return Err(CodexErr::Sandbox(SandboxErr::Denied(
155                    exit_code,
156                    stdout.text,
157                    stderr.text,
158                )));
159            }
160
161            Ok(ExecToolCallOutput {
162                exit_code,
163                stdout,
164                stderr,
165                duration,
166            })
167        }
168        Err(err) => {
169            tracing::error!("exec error: {err}");
170            Err(err)
171        }
172    }
173}
174
175/// We don't have a fully deterministic way to tell if our command failed
176/// because of the sandbox - a command in the user's zshrc file might hit an
177/// error, but the command itself might fail or succeed for other reasons.
178/// For now, we conservatively check for 'command not found' (exit code 127),
179/// and can add additional cases as necessary.
180fn is_likely_sandbox_denied(sandbox_type: SandboxType, exit_code: i32) -> bool {
181    if sandbox_type == SandboxType::None {
182        return false;
183    }
184
185    // Quick rejects: well-known non-sandbox shell exit codes
186    // 127: command not found, 2: misuse of shell builtins
187    if exit_code == 127 {
188        return false;
189    }
190
191    // For all other cases, we assume the sandbox is the cause
192    true
193}
194
195#[derive(Debug)]
196pub struct StreamOutput<T> {
197    pub text: T,
198    pub truncated_after_lines: Option<u32>,
199}
200#[derive(Debug)]
201pub struct RawExecToolCallOutput {
202    pub exit_status: ExitStatus,
203    pub stdout: StreamOutput<Vec<u8>>,
204    pub stderr: StreamOutput<Vec<u8>>,
205}
206
207impl StreamOutput<String> {
208    pub const fn new(text: String) -> Self {
209        Self {
210            text,
211            truncated_after_lines: None,
212        }
213    }
214}
215
216impl StreamOutput<Vec<u8>> {
217    pub fn from_utf8_lossy(&self) -> StreamOutput<String> {
218        StreamOutput {
219            text: String::from_utf8_lossy(&self.text).to_string(),
220            truncated_after_lines: self.truncated_after_lines,
221        }
222    }
223}
224
225#[derive(Debug)]
226pub struct ExecToolCallOutput {
227    pub exit_code: i32,
228    pub stdout: StreamOutput<String>,
229    pub stderr: StreamOutput<String>,
230    pub duration: Duration,
231}
232
233async fn exec(
234    params: ExecParams,
235    sandbox_policy: &SandboxPolicy,
236    stdout_stream: Option<StdoutStream>,
237) -> Result<RawExecToolCallOutput> {
238    let timeout = params.timeout_duration();
239    let ExecParams {
240        command, cwd, env, ..
241    } = params;
242
243    let (program, args) = command.split_first().ok_or_else(|| {
244        CodexErr::Io(io::Error::new(
245            io::ErrorKind::InvalidInput,
246            "command args are empty",
247        ))
248    })?;
249    let arg0 = None;
250    let child = spawn_child_async(
251        PathBuf::from(program),
252        args.into(),
253        arg0,
254        cwd,
255        sandbox_policy,
256        StdioPolicy::RedirectForShellTool,
257        env,
258    )
259    .await?;
260    consume_truncated_output(child, timeout, stdout_stream).await
261}
262
263/// Consumes the output of a child process, truncating it so it is suitable for
264/// use as the output of a `shell` tool call. Also enforces specified timeout.
265pub(crate) async fn consume_truncated_output(
266    mut child: Child,
267    timeout: Duration,
268    stdout_stream: Option<StdoutStream>,
269) -> Result<RawExecToolCallOutput> {
270    // Both stdout and stderr were configured with `Stdio::piped()`
271    // above, therefore `take()` should normally return `Some`.  If it doesn't
272    // we treat it as an exceptional I/O error
273
274    let stdout_reader = child.stdout.take().ok_or_else(|| {
275        CodexErr::Io(io::Error::other(
276            "stdout pipe was unexpectedly not available",
277        ))
278    })?;
279    let stderr_reader = child.stderr.take().ok_or_else(|| {
280        CodexErr::Io(io::Error::other(
281            "stderr pipe was unexpectedly not available",
282        ))
283    })?;
284
285    let stdout_handle = tokio::spawn(read_capped(
286        BufReader::new(stdout_reader),
287        MAX_STREAM_OUTPUT,
288        MAX_STREAM_OUTPUT_LINES,
289        stdout_stream.clone(),
290        false,
291    ));
292    let stderr_handle = tokio::spawn(read_capped(
293        BufReader::new(stderr_reader),
294        MAX_STREAM_OUTPUT,
295        MAX_STREAM_OUTPUT_LINES,
296        stdout_stream.clone(),
297        true,
298    ));
299
300    let exit_status = tokio::select! {
301        result = tokio::time::timeout(timeout, child.wait()) => {
302            match result {
303                Ok(Ok(exit_status)) => exit_status,
304                Ok(e) => e?,
305                Err(_) => {
306                    // timeout
307                    child.start_kill()?;
308                    // Debatable whether `child.wait().await` should be called here.
309                    synthetic_exit_status(128 + TIMEOUT_CODE)
310                }
311            }
312        }
313        _ = tokio::signal::ctrl_c() => {
314            child.start_kill()?;
315            synthetic_exit_status(128 + SIGKILL_CODE)
316        }
317    };
318
319    let stdout = stdout_handle.await??;
320    let stderr = stderr_handle.await??;
321
322    Ok(RawExecToolCallOutput {
323        exit_status,
324        stdout,
325        stderr,
326    })
327}
328
329async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
330    mut reader: R,
331    max_output: usize,
332    max_lines: usize,
333    stream: Option<StdoutStream>,
334    is_stderr: bool,
335) -> io::Result<StreamOutput<Vec<u8>>> {
336    let mut buf = Vec::with_capacity(max_output.min(8 * 1024));
337    let mut tmp = [0u8; 8192];
338
339    let mut remaining_bytes = max_output;
340    let mut remaining_lines = max_lines;
341
342    loop {
343        let n = reader.read(&mut tmp).await?;
344        if n == 0 {
345            break;
346        }
347
348        if let Some(stream) = &stream {
349            let chunk = tmp[..n].to_vec();
350            let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
351                call_id: stream.call_id.clone(),
352                stream: if is_stderr {
353                    ExecOutputStream::Stderr
354                } else {
355                    ExecOutputStream::Stdout
356                },
357                chunk: ByteBuf::from(chunk),
358            });
359            let event = Event {
360                id: stream.sub_id.clone(),
361                msg,
362            };
363            #[allow(clippy::let_unit_value)]
364            let _ = stream.tx_event.send(event).await;
365        }
366
367        // Copy into the buffer only while we still have byte and line budget.
368        if remaining_bytes > 0 && remaining_lines > 0 {
369            let mut copy_len = 0;
370            for &b in &tmp[..n] {
371                if remaining_bytes == 0 || remaining_lines == 0 {
372                    break;
373                }
374                copy_len += 1;
375                remaining_bytes -= 1;
376                if b == b'\n' {
377                    remaining_lines -= 1;
378                }
379            }
380            buf.extend_from_slice(&tmp[..copy_len]);
381        }
382        // Continue reading to EOF to avoid back-pressure, but discard once caps are hit.
383    }
384
385    let truncated = remaining_lines == 0 || remaining_bytes == 0;
386
387    Ok(StreamOutput {
388        text: buf,
389        truncated_after_lines: if truncated {
390            Some((max_lines - remaining_lines) as u32)
391        } else {
392            None
393        },
394    })
395}
396
397#[cfg(unix)]
398fn synthetic_exit_status(code: i32) -> ExitStatus {
399    use std::os::unix::process::ExitStatusExt;
400    std::process::ExitStatus::from_raw(code)
401}
402
403#[cfg(windows)]
404fn synthetic_exit_status(code: i32) -> ExitStatus {
405    use std::os::windows::process::ExitStatusExt;
406    #[expect(clippy::unwrap_used)]
407    std::process::ExitStatus::from_raw(code.try_into().unwrap())
408}