1#[cfg(unix)]
2use std::os::unix::process::ExitStatusExt;
3
4use std::collections::HashMap;
5use std::io;
6use std::path::PathBuf;
7use std::process::ExitStatus;
8use std::time::Duration;
9use std::time::Instant;
10
11use async_channel::Sender;
12use tokio::io::AsyncRead;
13use tokio::io::AsyncReadExt;
14use tokio::io::BufReader;
15use tokio::process::Child;
16
17use crate::error::CodexErr;
18use crate::error::Result;
19use crate::error::SandboxErr;
20use crate::landlock::spawn_command_under_linux_sandbox;
21use crate::modes::ModeRestrictions;
22use crate::protocol::Event;
23use crate::protocol::EventMsg;
24use crate::protocol::ExecCommandOutputDeltaEvent;
25use crate::protocol::ExecOutputStream;
26use crate::protocol::SandboxPolicy;
27use crate::seatbelt::spawn_command_under_seatbelt;
28use crate::spawn::StdioPolicy;
29use crate::spawn::spawn_child_async;
30use serde_bytes::ByteBuf;
31
32const MAX_STREAM_OUTPUT: usize = 10 * 1024;
36const MAX_STREAM_OUTPUT_LINES: usize = 256;
37
38const DEFAULT_TIMEOUT_MS: u64 = 10_000;
39
40const SIGKILL_CODE: i32 = 9;
43const TIMEOUT_CODE: i32 = 64;
44
45#[derive(Debug, Clone)]
46pub struct ExecParams {
47 pub command: Vec<String>,
48 pub cwd: PathBuf,
49 pub timeout_ms: Option<u64>,
50 pub env: HashMap<String, String>,
51 pub with_escalated_permissions: Option<bool>,
52 pub justification: Option<String>,
53}
54
55impl ExecParams {
56 pub fn timeout_duration(&self) -> Duration {
57 Duration::from_millis(self.timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS))
58 }
59}
60
61#[derive(Clone, Copy, Debug, PartialEq)]
62pub enum SandboxType {
63 None,
64
65 MacosSeatbelt,
67
68 LinuxSeccomp,
70}
71
72#[derive(Clone)]
73pub struct StdoutStream {
74 pub sub_id: String,
75 pub call_id: String,
76 pub tx_event: Sender<Event>,
77}
78
79pub async fn process_exec_tool_call(
80 params: ExecParams,
81 sandbox_type: SandboxType,
82 sandbox_policy: &SandboxPolicy,
83 codex_linux_sandbox_exe: &Option<PathBuf>,
84 stdout_stream: Option<StdoutStream>,
85 mode_restrictions: &ModeRestrictions,
86) -> Result<ExecToolCallOutput> {
87 if !mode_restrictions.allow_command_exec {
89 return Err(CodexErr::ModeRestriction(
90 "Command execution not allowed in current mode".to_string(),
91 ));
92 }
93
94 let start = Instant::now();
95
96 let raw_output_result: std::result::Result<RawExecToolCallOutput, CodexErr> = match sandbox_type
97 {
98 SandboxType::None => exec(params, sandbox_policy, stdout_stream.clone()).await,
99 SandboxType::MacosSeatbelt => {
100 let timeout = params.timeout_duration();
101 let ExecParams {
102 command, cwd, env, ..
103 } = params;
104 let child = spawn_command_under_seatbelt(
105 command,
106 sandbox_policy,
107 cwd,
108 StdioPolicy::RedirectForShellTool,
109 env,
110 )
111 .await?;
112 consume_truncated_output(child, timeout, stdout_stream.clone()).await
113 }
114 SandboxType::LinuxSeccomp => {
115 let timeout = params.timeout_duration();
116 let ExecParams {
117 command, cwd, env, ..
118 } = params;
119
120 let codex_linux_sandbox_exe = codex_linux_sandbox_exe
121 .as_ref()
122 .ok_or(CodexErr::LandlockSandboxExecutableNotProvided)?;
123 let child = spawn_command_under_linux_sandbox(
124 codex_linux_sandbox_exe,
125 command,
126 sandbox_policy,
127 cwd,
128 StdioPolicy::RedirectForShellTool,
129 env,
130 )
131 .await?;
132
133 consume_truncated_output(child, timeout, stdout_stream).await
134 }
135 };
136 let duration = start.elapsed();
137 match raw_output_result {
138 Ok(raw_output) => {
139 let stdout = raw_output.stdout.from_utf8_lossy();
140 let stderr = raw_output.stderr.from_utf8_lossy();
141
142 #[cfg(target_family = "unix")]
143 match raw_output.exit_status.signal() {
144 Some(TIMEOUT_CODE) => return Err(CodexErr::Sandbox(SandboxErr::Timeout)),
145 Some(signal) => {
146 return Err(CodexErr::Sandbox(SandboxErr::Signal(signal)));
147 }
148 None => {}
149 }
150
151 let exit_code = raw_output.exit_status.code().unwrap_or(-1);
152
153 if exit_code != 0 && is_likely_sandbox_denied(sandbox_type, exit_code) {
154 return Err(CodexErr::Sandbox(SandboxErr::Denied(
155 exit_code,
156 stdout.text,
157 stderr.text,
158 )));
159 }
160
161 Ok(ExecToolCallOutput {
162 exit_code,
163 stdout,
164 stderr,
165 duration,
166 })
167 }
168 Err(err) => {
169 tracing::error!("exec error: {err}");
170 Err(err)
171 }
172 }
173}
174
175fn is_likely_sandbox_denied(sandbox_type: SandboxType, exit_code: i32) -> bool {
181 if sandbox_type == SandboxType::None {
182 return false;
183 }
184
185 if exit_code == 127 {
188 return false;
189 }
190
191 true
193}
194
195#[derive(Debug)]
196pub struct StreamOutput<T> {
197 pub text: T,
198 pub truncated_after_lines: Option<u32>,
199}
200#[derive(Debug)]
201pub struct RawExecToolCallOutput {
202 pub exit_status: ExitStatus,
203 pub stdout: StreamOutput<Vec<u8>>,
204 pub stderr: StreamOutput<Vec<u8>>,
205}
206
207impl StreamOutput<String> {
208 pub const fn new(text: String) -> Self {
209 Self {
210 text,
211 truncated_after_lines: None,
212 }
213 }
214}
215
216impl StreamOutput<Vec<u8>> {
217 pub fn from_utf8_lossy(&self) -> StreamOutput<String> {
218 StreamOutput {
219 text: String::from_utf8_lossy(&self.text).to_string(),
220 truncated_after_lines: self.truncated_after_lines,
221 }
222 }
223}
224
225#[derive(Debug)]
226pub struct ExecToolCallOutput {
227 pub exit_code: i32,
228 pub stdout: StreamOutput<String>,
229 pub stderr: StreamOutput<String>,
230 pub duration: Duration,
231}
232
233async fn exec(
234 params: ExecParams,
235 sandbox_policy: &SandboxPolicy,
236 stdout_stream: Option<StdoutStream>,
237) -> Result<RawExecToolCallOutput> {
238 let timeout = params.timeout_duration();
239 let ExecParams {
240 command, cwd, env, ..
241 } = params;
242
243 let (program, args) = command.split_first().ok_or_else(|| {
244 CodexErr::Io(io::Error::new(
245 io::ErrorKind::InvalidInput,
246 "command args are empty",
247 ))
248 })?;
249 let arg0 = None;
250 let child = spawn_child_async(
251 PathBuf::from(program),
252 args.into(),
253 arg0,
254 cwd,
255 sandbox_policy,
256 StdioPolicy::RedirectForShellTool,
257 env,
258 )
259 .await?;
260 consume_truncated_output(child, timeout, stdout_stream).await
261}
262
263pub(crate) async fn consume_truncated_output(
266 mut child: Child,
267 timeout: Duration,
268 stdout_stream: Option<StdoutStream>,
269) -> Result<RawExecToolCallOutput> {
270 let stdout_reader = child.stdout.take().ok_or_else(|| {
275 CodexErr::Io(io::Error::other(
276 "stdout pipe was unexpectedly not available",
277 ))
278 })?;
279 let stderr_reader = child.stderr.take().ok_or_else(|| {
280 CodexErr::Io(io::Error::other(
281 "stderr pipe was unexpectedly not available",
282 ))
283 })?;
284
285 let stdout_handle = tokio::spawn(read_capped(
286 BufReader::new(stdout_reader),
287 MAX_STREAM_OUTPUT,
288 MAX_STREAM_OUTPUT_LINES,
289 stdout_stream.clone(),
290 false,
291 ));
292 let stderr_handle = tokio::spawn(read_capped(
293 BufReader::new(stderr_reader),
294 MAX_STREAM_OUTPUT,
295 MAX_STREAM_OUTPUT_LINES,
296 stdout_stream.clone(),
297 true,
298 ));
299
300 let exit_status = tokio::select! {
301 result = tokio::time::timeout(timeout, child.wait()) => {
302 match result {
303 Ok(Ok(exit_status)) => exit_status,
304 Ok(e) => e?,
305 Err(_) => {
306 child.start_kill()?;
308 synthetic_exit_status(128 + TIMEOUT_CODE)
310 }
311 }
312 }
313 _ = tokio::signal::ctrl_c() => {
314 child.start_kill()?;
315 synthetic_exit_status(128 + SIGKILL_CODE)
316 }
317 };
318
319 let stdout = stdout_handle.await??;
320 let stderr = stderr_handle.await??;
321
322 Ok(RawExecToolCallOutput {
323 exit_status,
324 stdout,
325 stderr,
326 })
327}
328
329async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
330 mut reader: R,
331 max_output: usize,
332 max_lines: usize,
333 stream: Option<StdoutStream>,
334 is_stderr: bool,
335) -> io::Result<StreamOutput<Vec<u8>>> {
336 let mut buf = Vec::with_capacity(max_output.min(8 * 1024));
337 let mut tmp = [0u8; 8192];
338
339 let mut remaining_bytes = max_output;
340 let mut remaining_lines = max_lines;
341
342 loop {
343 let n = reader.read(&mut tmp).await?;
344 if n == 0 {
345 break;
346 }
347
348 if let Some(stream) = &stream {
349 let chunk = tmp[..n].to_vec();
350 let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
351 call_id: stream.call_id.clone(),
352 stream: if is_stderr {
353 ExecOutputStream::Stderr
354 } else {
355 ExecOutputStream::Stdout
356 },
357 chunk: ByteBuf::from(chunk),
358 });
359 let event = Event {
360 id: stream.sub_id.clone(),
361 msg,
362 };
363 #[allow(clippy::let_unit_value)]
364 let _ = stream.tx_event.send(event).await;
365 }
366
367 if remaining_bytes > 0 && remaining_lines > 0 {
369 let mut copy_len = 0;
370 for &b in &tmp[..n] {
371 if remaining_bytes == 0 || remaining_lines == 0 {
372 break;
373 }
374 copy_len += 1;
375 remaining_bytes -= 1;
376 if b == b'\n' {
377 remaining_lines -= 1;
378 }
379 }
380 buf.extend_from_slice(&tmp[..copy_len]);
381 }
382 }
384
385 let truncated = remaining_lines == 0 || remaining_bytes == 0;
386
387 Ok(StreamOutput {
388 text: buf,
389 truncated_after_lines: if truncated {
390 Some((max_lines - remaining_lines) as u32)
391 } else {
392 None
393 },
394 })
395}
396
397#[cfg(unix)]
398fn synthetic_exit_status(code: i32) -> ExitStatus {
399 use std::os::unix::process::ExitStatusExt;
400 std::process::ExitStatus::from_raw(code)
401}
402
403#[cfg(windows)]
404fn synthetic_exit_status(code: i32) -> ExitStatus {
405 use std::os::windows::process::ExitStatusExt;
406 #[expect(clippy::unwrap_used)]
407 std::process::ExitStatus::from_raw(code.try_into().unwrap())
408}