1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::process::Stdio;
4
5use async_stream::try_stream;
6use futures::stream::BoxStream;
7use serde_json::Value;
8use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
9use tokio::process::{Child, ChildStdout, Command};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::codex_options::CodexConfigObject;
14use crate::errors::{Error, Result};
15use crate::thread_options::{ApprovalMode, ModelReasoningEffort, SandboxMode, WebSearchMode};
16
17const INTERNAL_ORIGINATOR_ENV: &str = "CODEX_INTERNAL_ORIGINATOR_OVERRIDE";
18const RUST_SDK_ORIGINATOR: &str = "codex_sdk_rust";
19
20#[derive(Debug, Clone, Default)]
22pub struct CodexExecArgs {
23 pub input: String,
25 pub base_url: Option<String>,
27 pub api_key: Option<String>,
29 pub thread_id: Option<String>,
31 pub images: Vec<String>,
33 pub model: Option<String>,
35 pub sandbox_mode: Option<SandboxMode>,
37 pub working_directory: Option<String>,
39 pub additional_directories: Vec<String>,
41 pub skip_git_repo_check: bool,
43 pub output_schema_file: Option<String>,
45 pub model_reasoning_effort: Option<ModelReasoningEffort>,
47 pub network_access_enabled: Option<bool>,
49 pub web_search_mode: Option<WebSearchMode>,
51 pub web_search_enabled: Option<bool>,
53 pub approval_policy: Option<ApprovalMode>,
55 pub cancellation_token: Option<CancellationToken>,
57}
58
59#[derive(Debug, Clone)]
61pub struct CodexExec {
62 executable_path: String,
63 env_override: Option<HashMap<String, String>>,
64 config_overrides: Option<CodexConfigObject>,
65}
66
67impl CodexExec {
68 pub fn new(
82 executable_path_override: Option<String>,
83 env_override: Option<HashMap<String, String>>,
84 config_overrides: Option<CodexConfigObject>,
85 ) -> Result<Self> {
86 let executable_path = match executable_path_override {
87 Some(path) => path,
88 None => find_codex_path()?,
89 };
90
91 Ok(Self {
92 executable_path,
93 env_override,
94 config_overrides,
95 })
96 }
97
98 pub async fn run(&self, args: CodexExecArgs) -> Result<BoxStream<'static, Result<String>>> {
125 if args
126 .cancellation_token
127 .as_ref()
128 .is_some_and(CancellationToken::is_cancelled)
129 {
130 return Err(Error::Cancelled);
131 }
132
133 let command_args = self.build_command_args(&args)?;
134
135 let mut command = Command::new(&self.executable_path);
136 command
137 .args(&command_args)
138 .stdin(Stdio::piped())
139 .stdout(Stdio::piped())
140 .stderr(Stdio::piped())
141 .kill_on_drop(true);
142 command.env_clear();
143 command.envs(build_env(&self.env_override, &args));
144
145 let mut child = command
146 .spawn()
147 .map_err(|e| Error::Spawn(format!("{} ({e})", self.executable_path)))?;
148
149 let mut stdin = child
150 .stdin
151 .take()
152 .ok_or_else(|| Error::Spawn("child process has no stdin".to_string()))?;
153 stdin.write_all(args.input.as_bytes()).await?;
154 stdin.shutdown().await?;
155
156 let stdout = child
157 .stdout
158 .take()
159 .ok_or_else(|| Error::Spawn("child process has no stdout".to_string()))?;
160 let stderr = child.stderr.take();
161
162 let mut lines = BufReader::new(stdout).lines();
163 let cancellation_token = args.cancellation_token;
164 let mut stderr_task = Some(spawn_stderr_reader(stderr));
165
166 let output = try_stream! {
167 loop {
168 let next = next_line_or_cancel(
169 &mut lines,
170 cancellation_token.as_ref(),
171 &mut child,
172 &mut stderr_task,
173 ).await?;
174
175 match next {
176 Some(line) => yield line,
177 None => break,
178 }
179 }
180
181 let status = child.wait().await?;
182 let stderr = take_stderr(&mut stderr_task).await;
183
184 if !status.success() {
185 let detail = match status.code() {
186 Some(code) => format!("code {code}"),
187 None => "signal termination".to_string(),
188 };
189 Err(Error::Process {
190 detail,
191 stderr,
192 code: status.code(),
193 })?;
194 }
195 };
196
197 Ok(Box::pin(output))
198 }
199
200 fn build_command_args(&self, args: &CodexExecArgs) -> Result<Vec<String>> {
201 let mut command_args = vec!["exec".to_string(), "--experimental-json".to_string()];
202
203 if let Some(config_overrides) = &self.config_overrides {
204 for override_value in serialize_config_overrides(config_overrides)? {
205 command_args.push("--config".to_string());
206 command_args.push(override_value);
207 }
208 }
209
210 if let Some(model) = &args.model {
211 command_args.push("--model".to_string());
212 command_args.push(model.clone());
213 }
214
215 if let Some(sandbox_mode) = args.sandbox_mode {
216 command_args.push("--sandbox".to_string());
217 command_args.push(sandbox_mode_to_str(sandbox_mode).to_string());
218 }
219
220 if let Some(working_directory) = &args.working_directory {
221 command_args.push("--cd".to_string());
222 command_args.push(working_directory.clone());
223 }
224
225 for dir in &args.additional_directories {
226 command_args.push("--add-dir".to_string());
227 command_args.push(dir.clone());
228 }
229
230 if args.skip_git_repo_check {
231 command_args.push("--skip-git-repo-check".to_string());
232 }
233
234 if let Some(output_schema_file) = &args.output_schema_file {
235 command_args.push("--output-schema".to_string());
236 command_args.push(output_schema_file.clone());
237 }
238
239 if let Some(reasoning_effort) = args.model_reasoning_effort {
240 command_args.push("--config".to_string());
241 command_args.push(format!(
242 "model_reasoning_effort=\"{}\"",
243 model_reasoning_effort_to_str(reasoning_effort)
244 ));
245 }
246
247 if let Some(network_access_enabled) = args.network_access_enabled {
248 command_args.push("--config".to_string());
249 command_args.push(format!(
250 "sandbox_workspace_write.network_access={network_access_enabled}"
251 ));
252 }
253
254 if let Some(web_search_mode) = args.web_search_mode {
255 command_args.push("--config".to_string());
256 command_args.push(format!(
257 "web_search=\"{}\"",
258 web_search_mode_to_str(web_search_mode)
259 ));
260 } else if let Some(web_search_enabled) = args.web_search_enabled {
261 command_args.push("--config".to_string());
262 let mode = if web_search_enabled {
263 "live"
264 } else {
265 "disabled"
266 };
267 command_args.push(format!("web_search=\"{mode}\""));
268 }
269
270 if let Some(approval_policy) = args.approval_policy {
271 command_args.push("--config".to_string());
272 command_args.push(format!(
273 "approval_policy=\"{}\"",
274 approval_mode_to_str(approval_policy)
275 ));
276 }
277
278 if let Some(thread_id) = &args.thread_id {
279 command_args.push("resume".to_string());
280 command_args.push(thread_id.clone());
281 }
282
283 for image in &args.images {
284 command_args.push("--image".to_string());
285 command_args.push(image.clone());
286 }
287
288 Ok(command_args)
289 }
290}
291
292fn find_codex_path() -> Result<String> {
293 if let Ok(path) = which::which("codex") {
294 return Ok(path.to_string_lossy().into_owned());
295 }
296
297 let cwd = std::env::current_dir().ok();
298 let home = home_dir();
299 if let Some(path) = find_codex_path_from(cwd.as_deref(), home.as_deref()) {
300 return Ok(path);
301 }
302
303 Err(Error::CliNotFound(
304 "codex executable was not found. Checked PATH, local node_modules, platform vendor binaries, and common global install locations. Set codex_path_override or install @openai/codex".to_string(),
305 ))
306}
307
308fn build_env(
309 env_override: &Option<HashMap<String, String>>,
310 args: &CodexExecArgs,
311) -> HashMap<String, String> {
312 let mut env = match env_override {
313 Some(override_map) => override_map.clone(),
314 None => std::env::vars().collect(),
315 };
316
317 env.entry(INTERNAL_ORIGINATOR_ENV.to_string())
318 .or_insert_with(|| RUST_SDK_ORIGINATOR.to_string());
319
320 if let Some(base_url) = &args.base_url {
321 env.insert("OPENAI_BASE_URL".to_string(), base_url.clone());
322 }
323 if let Some(api_key) = &args.api_key {
324 env.insert("CODEX_API_KEY".to_string(), api_key.clone());
325 }
326
327 env
328}
329
330fn find_codex_path_from(start_dir: Option<&Path>, home_dir: Option<&Path>) -> Option<String> {
331 if let Some(start_dir) = start_dir {
332 for dir in start_dir.ancestors() {
333 let local_bin = dir
334 .join("node_modules")
335 .join(".bin")
336 .join(codex_binary_name());
337 if local_bin.is_file() {
338 return Some(local_bin.to_string_lossy().into_owned());
339 }
340
341 if let Some(vendor_path) = local_vendor_binary_path(dir) {
342 return Some(vendor_path.to_string_lossy().into_owned());
343 }
344 }
345 }
346
347 for path in common_global_locations(home_dir) {
348 if path.is_file() {
349 return Some(path.to_string_lossy().into_owned());
350 }
351 }
352
353 None
354}
355
356fn local_vendor_binary_path(base_dir: &Path) -> Option<PathBuf> {
357 let target_triple = platform_target_triple()?;
358 let package = platform_package_for_target(target_triple)?;
359
360 let candidate = base_dir
361 .join("node_modules")
362 .join(package)
363 .join("vendor")
364 .join(target_triple)
365 .join("codex")
366 .join(codex_binary_name());
367
368 if candidate.is_file() {
369 Some(candidate)
370 } else {
371 None
372 }
373}
374
375fn common_global_locations(home_dir: Option<&Path>) -> Vec<PathBuf> {
376 let mut locations = Vec::new();
377 if let Some(home) = home_dir {
378 locations.push(
379 home.join(".npm-global")
380 .join("bin")
381 .join(codex_binary_name()),
382 );
383 locations.push(home.join(".local").join("bin").join(codex_binary_name()));
384 locations.push(
385 home.join("node_modules")
386 .join(".bin")
387 .join(codex_binary_name()),
388 );
389 locations.push(home.join(".yarn").join("bin").join(codex_binary_name()));
390 locations.push(home.join(".codex").join("local").join(codex_binary_name()));
391 }
392 locations.push(PathBuf::from("/usr/local/bin").join(codex_binary_name()));
393 locations
394}
395
396fn codex_binary_name() -> &'static str {
397 if cfg!(windows) { "codex.exe" } else { "codex" }
398}
399
400fn home_dir() -> Option<PathBuf> {
401 #[cfg(windows)]
402 {
403 std::env::var_os("USERPROFILE").map(PathBuf::from)
404 }
405 #[cfg(not(windows))]
406 {
407 std::env::var_os("HOME").map(PathBuf::from)
408 }
409}
410
411fn platform_target_triple() -> Option<&'static str> {
412 match (std::env::consts::OS, std::env::consts::ARCH) {
413 ("linux", "x86_64") => Some("x86_64-unknown-linux-musl"),
414 ("linux", "aarch64") => Some("aarch64-unknown-linux-musl"),
415 ("android", "x86_64") => Some("x86_64-unknown-linux-musl"),
416 ("android", "aarch64") => Some("aarch64-unknown-linux-musl"),
417 ("macos", "x86_64") => Some("x86_64-apple-darwin"),
418 ("macos", "aarch64") => Some("aarch64-apple-darwin"),
419 ("windows", "x86_64") => Some("x86_64-pc-windows-msvc"),
420 ("windows", "aarch64") => Some("aarch64-pc-windows-msvc"),
421 _ => None,
422 }
423}
424
425fn platform_package_for_target(target_triple: &str) -> Option<&'static str> {
426 match target_triple {
427 "x86_64-unknown-linux-musl" => Some("@openai/codex-linux-x64"),
428 "aarch64-unknown-linux-musl" => Some("@openai/codex-linux-arm64"),
429 "x86_64-apple-darwin" => Some("@openai/codex-darwin-x64"),
430 "aarch64-apple-darwin" => Some("@openai/codex-darwin-arm64"),
431 "x86_64-pc-windows-msvc" => Some("@openai/codex-win32-x64"),
432 "aarch64-pc-windows-msvc" => Some("@openai/codex-win32-arm64"),
433 _ => None,
434 }
435}
436
437fn spawn_stderr_reader(stderr: Option<tokio::process::ChildStderr>) -> JoinHandle<String> {
438 tokio::spawn(async move {
439 let mut stderr_buffer = Vec::new();
440 if let Some(mut stderr) = stderr {
441 let _ = stderr.read_to_end(&mut stderr_buffer).await;
442 }
443 String::from_utf8_lossy(&stderr_buffer).into_owned()
444 })
445}
446
447async fn take_stderr(stderr_task: &mut Option<JoinHandle<String>>) -> String {
448 let Some(task) = stderr_task.take() else {
449 return String::new();
450 };
451 (task.await).unwrap_or_default()
452}
453
454async fn next_line_or_cancel(
455 lines: &mut tokio::io::Lines<BufReader<ChildStdout>>,
456 cancellation_token: Option<&CancellationToken>,
457 child: &mut Child,
458 stderr_task: &mut Option<JoinHandle<String>>,
459) -> Result<Option<String>> {
460 match cancellation_token {
461 Some(token) => {
462 tokio::select! {
463 _ = token.cancelled() => {
464 let _ = child.kill().await;
465 let _ = child.wait().await;
466 let _ = take_stderr(stderr_task).await;
467 Err(Error::Cancelled)
468 }
469 line = lines.next_line() => line.map_err(Error::from),
470 }
471 }
472 None => lines.next_line().await.map_err(Error::from),
473 }
474}
475
476fn serialize_config_overrides(config_overrides: &CodexConfigObject) -> Result<Vec<String>> {
477 let mut overrides = Vec::new();
478 flatten_config_overrides(&Value::Object(config_overrides.clone()), "", &mut overrides)?;
479 Ok(overrides)
480}
481
482fn flatten_config_overrides(
483 value: &Value,
484 prefix: &str,
485 overrides: &mut Vec<String>,
486) -> Result<()> {
487 let Some(object) = value.as_object() else {
488 if prefix.is_empty() {
489 return Err(Error::InvalidConfig(
490 "Codex config overrides must be a plain object".to_string(),
491 ));
492 }
493
494 overrides.push(format!("{prefix}={}", to_toml_value(value, prefix)?));
495 return Ok(());
496 };
497
498 if prefix.is_empty() && object.is_empty() {
499 return Ok(());
500 }
501 if !prefix.is_empty() && object.is_empty() {
502 overrides.push(format!("{prefix}={{}}"));
503 return Ok(());
504 }
505
506 for (key, child) in object {
507 if key.is_empty() {
508 return Err(Error::InvalidConfig(
509 "Codex config override keys must be non-empty strings".to_string(),
510 ));
511 }
512
513 let formatted_key = format_toml_key(key);
514 let path = if prefix.is_empty() {
515 formatted_key
516 } else {
517 format!("{prefix}.{formatted_key}")
518 };
519
520 if child.is_object() {
521 flatten_config_overrides(child, &path, overrides)?;
522 } else {
523 overrides.push(format!("{path}={}", to_toml_value(child, &path)?));
524 }
525 }
526
527 Ok(())
528}
529
530fn to_toml_value(value: &Value, path: &str) -> Result<String> {
531 match value {
532 Value::String(s) => Ok(serde_json::to_string(s)?),
533 Value::Number(n) => {
534 if let Some(f) = n.as_f64()
535 && !f.is_finite()
536 {
537 return Err(Error::InvalidConfig(format!(
538 "Codex config override at {path} must be a finite number"
539 )));
540 }
541 Ok(n.to_string())
542 }
543 Value::Bool(b) => Ok(if *b { "true" } else { "false" }.to_string()),
544 Value::Array(items) => {
545 let mut rendered = Vec::with_capacity(items.len());
546 for (index, item) in items.iter().enumerate() {
547 rendered.push(to_toml_value(item, &format!("{path}[{index}]"))?);
548 }
549 Ok(format!("[{}]", rendered.join(", ")))
550 }
551 Value::Object(map) => {
552 let mut parts = Vec::with_capacity(map.len());
553 for (key, child) in map {
554 if key.is_empty() {
555 return Err(Error::InvalidConfig(
556 "Codex config override keys must be non-empty strings".to_string(),
557 ));
558 }
559 let child_value = to_toml_value(child, &format!("{path}.{key}"))?;
560 parts.push(format!("{} = {child_value}", format_toml_key(key)));
561 }
562 Ok(format!("{{{}}}", parts.join(", ")))
563 }
564 Value::Null => Err(Error::InvalidConfig(format!(
565 "Codex config override at {path} cannot be null"
566 ))),
567 }
568}
569
570fn format_toml_key(key: &str) -> String {
571 if is_bare_toml_key(key) {
572 key.to_string()
573 } else {
574 serde_json::to_string(key).unwrap_or_else(|_| "\"\"".to_string())
576 }
577}
578
579fn is_bare_toml_key(key: &str) -> bool {
580 !key.is_empty()
581 && key
582 .chars()
583 .all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-')
584}
585
586fn sandbox_mode_to_str(mode: SandboxMode) -> &'static str {
587 match mode {
588 SandboxMode::ReadOnly => "read-only",
589 SandboxMode::WorkspaceWrite => "workspace-write",
590 SandboxMode::DangerFullAccess => "danger-full-access",
591 }
592}
593
594fn model_reasoning_effort_to_str(mode: ModelReasoningEffort) -> &'static str {
595 match mode {
596 ModelReasoningEffort::Minimal => "minimal",
597 ModelReasoningEffort::Low => "low",
598 ModelReasoningEffort::Medium => "medium",
599 ModelReasoningEffort::High => "high",
600 ModelReasoningEffort::XHigh => "xhigh",
601 }
602}
603
604fn web_search_mode_to_str(mode: WebSearchMode) -> &'static str {
605 match mode {
606 WebSearchMode::Disabled => "disabled",
607 WebSearchMode::Cached => "cached",
608 WebSearchMode::Live => "live",
609 }
610}
611
612fn approval_mode_to_str(mode: ApprovalMode) -> &'static str {
613 match mode {
614 ApprovalMode::Never => "never",
615 ApprovalMode::OnRequest => "on-request",
616 ApprovalMode::OnFailure => "on-failure",
617 ApprovalMode::Untrusted => "untrusted",
618 }
619}
620
621#[cfg(test)]
622mod tests {
623 use super::{
624 codex_binary_name, find_codex_path_from, platform_package_for_target,
625 platform_target_triple,
626 };
627
628 #[test]
629 fn finds_codex_in_local_node_modules_bin() {
630 let root = tempfile::tempdir().expect("tempdir");
631 let bin = root.path().join("node_modules").join(".bin");
632 std::fs::create_dir_all(&bin).expect("create bin");
633 let codex = bin.join(codex_binary_name());
634 std::fs::write(&codex, "").expect("create file");
635
636 let nested = root.path().join("packages").join("app");
637 std::fs::create_dir_all(&nested).expect("create nested");
638
639 let found = find_codex_path_from(Some(&nested), None).expect("path");
640 assert_eq!(found, codex.to_string_lossy());
641 }
642
643 #[test]
644 fn finds_codex_in_platform_vendor_package() {
645 let Some(target) = platform_target_triple() else {
646 return;
647 };
648 let Some(package) = platform_package_for_target(target) else {
649 return;
650 };
651
652 let root = tempfile::tempdir().expect("tempdir");
653 let codex = root
654 .path()
655 .join("node_modules")
656 .join(package)
657 .join("vendor")
658 .join(target)
659 .join("codex")
660 .join(codex_binary_name());
661 std::fs::create_dir_all(codex.parent().expect("parent")).expect("mkdir");
662 std::fs::write(&codex, "").expect("write");
663
664 let nested = root.path().join("workspace").join("crate");
665 std::fs::create_dir_all(&nested).expect("nested");
666
667 let found = find_codex_path_from(Some(&nested), None).expect("path");
668 assert_eq!(found, codex.to_string_lossy());
669 }
670
671 #[test]
672 fn finds_codex_in_common_global_location() {
673 let home = tempfile::tempdir().expect("tempdir");
674 let codex = home
675 .path()
676 .join(".npm-global")
677 .join("bin")
678 .join(codex_binary_name());
679 std::fs::create_dir_all(codex.parent().expect("parent")).expect("mkdir");
680 std::fs::write(&codex, "").expect("write");
681
682 let found = find_codex_path_from(None, Some(home.path())).expect("path");
683 assert_eq!(found, codex.to_string_lossy());
684 }
685}