Skip to main content

apiari_codex_sdk/
transport.rs

1//! Read-only NDJSON transport over a subprocess.
2//!
3//! [`ReadOnlyTransport`] wraps a `tokio::process::Child` running the `codex`
4//! CLI and reads JSONL lines from its stdout. Unlike the Claude SDK transport,
5//! stdin is `/dev/null` — codex exec is unidirectional.
6
7use crate::error::{Result, SdkError};
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::{Child, ChildStderr, ChildStdout, Command};
10use tracing::{debug, warn};
11
12/// Read-only NDJSON transport wrapping a `codex` subprocess.
13///
14/// Each line read from stdout is a single JSON object. There is no stdin
15/// writing — the prompt is passed as a CLI argument.
16pub struct ReadOnlyTransport {
17    child: Child,
18    stdout_reader: BufReader<ChildStdout>,
19    /// Buffer reused across reads to avoid allocations.
20    line_buf: String,
21    /// Handle to the stderr reader task.
22    stderr_task: Option<tokio::task::JoinHandle<String>>,
23}
24
25impl ReadOnlyTransport {
26    /// Spawn a new `codex` process.
27    ///
28    /// The process is launched as
29    /// `<cli_path> <subcommand_parts...> --json [extra_args...] [prompt]`.
30    /// Stdin is `/dev/null` — there is no send method.
31    ///
32    /// # Errors
33    ///
34    /// Returns [`SdkError::ProcessSpawn`] if the process cannot be started.
35    pub fn spawn(
36        cli_path: &str,
37        subcommand_parts: &[&str],
38        extra_args: &[String],
39        prompt: Option<&str>,
40        working_dir: Option<&std::path::Path>,
41        env_vars: &[(String, String)],
42    ) -> Result<Self> {
43        let mut cmd = Command::new(cli_path);
44
45        // Subcommand path (e.g. ["exec"] or ["exec", "resume"]).
46        cmd.args(subcommand_parts);
47
48        // Always request JSON output.
49        cmd.arg("--json");
50        cmd.arg("--skip-git-repo-check");
51
52        // Caller-supplied arguments (model, sandbox, etc.).
53        cmd.args(extra_args);
54
55        // Prompt as the final positional argument.
56        if let Some(prompt) = prompt {
57            cmd.arg(prompt);
58        }
59
60        // Clear the CLAUDECODE environment variable to allow the SDK to spawn
61        // codex from within a Claude Code agent session.
62        cmd.env_remove("CLAUDECODE");
63
64        // Working directory.
65        if let Some(dir) = working_dir {
66            cmd.current_dir(dir);
67        }
68
69        // Environment variables.
70        for (key, value) in env_vars {
71            cmd.env(key, value);
72        }
73
74        // stdin is null — codex exec is unidirectional.
75        cmd.stdin(std::process::Stdio::null());
76        cmd.stdout(std::process::Stdio::piped());
77        cmd.stderr(std::process::Stdio::piped());
78
79        let mut child = cmd.spawn().map_err(SdkError::ProcessSpawn)?;
80
81        let stdout = child
82            .stdout
83            .take()
84            .expect("stdout was configured as piped but is None");
85        let stderr = child.stderr.take();
86
87        let stdout_reader = BufReader::new(stdout);
88
89        // Spawn a background task to drain stderr so it doesn't block.
90        let stderr_task = stderr.map(|se| tokio::spawn(drain_stderr(se)));
91
92        Ok(Self {
93            child,
94            stdout_reader,
95            line_buf: String::with_capacity(4096),
96            stderr_task,
97        })
98    }
99
100    /// Read the next NDJSON line from stdout and parse it as a JSON value.
101    ///
102    /// Returns `Ok(None)` when stdout reaches EOF (process exited).
103    ///
104    /// # Errors
105    ///
106    /// Returns [`SdkError::InvalidJson`] if a line is not valid JSON.
107    /// Returns [`SdkError::Io`] on read failure.
108    pub async fn recv(&mut self) -> Result<Option<serde_json::Value>> {
109        loop {
110            self.line_buf.clear();
111            let n = self.stdout_reader.read_line(&mut self.line_buf).await?;
112            if n == 0 {
113                return Ok(None); // EOF
114            }
115
116            let line = self.line_buf.trim();
117            if line.is_empty() {
118                // Skip blank lines and try the next one.
119                continue;
120            }
121
122            debug!(line = %line, "stdout <-");
123
124            return serde_json::from_str(line)
125                .map(Some)
126                .map_err(|e| SdkError::InvalidJson {
127                    message: e.to_string(),
128                    line: line.to_owned(),
129                    source: e,
130                });
131        }
132    }
133
134    /// Send an interrupt signal (SIGINT on Unix) to the subprocess.
135    #[cfg(unix)]
136    pub fn interrupt(&self) -> Result<()> {
137        if let Some(pid) = self.child.id() {
138            // Safety: sending SIGINT to a known child PID.
139            let ret = unsafe { libc::kill(pid as libc::pid_t, libc::SIGINT) };
140            if ret != 0 {
141                return Err(SdkError::Io(std::io::Error::last_os_error()));
142            }
143        }
144        Ok(())
145    }
146
147    /// Send an interrupt signal on non-Unix platforms (not supported).
148    #[cfg(not(unix))]
149    pub fn interrupt(&self) -> Result<()> {
150        Err(SdkError::ProtocolError(
151            "interrupt is not supported on this platform".to_owned(),
152        ))
153    }
154
155    /// Kill the subprocess immediately (SIGKILL on Unix).
156    pub async fn kill(&mut self) -> Result<()> {
157        self.child.kill().await.map_err(SdkError::Io)
158    }
159
160    /// Wait for the subprocess to exit and return the exit code and captured stderr.
161    pub async fn wait_with_stderr(&mut self) -> Result<(Option<i32>, Option<String>)> {
162        let status = self.child.wait().await?;
163        let stderr = if let Some(task) = self.stderr_task.take() {
164            task.await.ok()
165        } else {
166            None
167        };
168        Ok((status.code(), stderr))
169    }
170
171    /// Check whether the child process has exited without blocking.
172    pub fn try_wait(&mut self) -> Result<Option<std::process::ExitStatus>> {
173        self.child.try_wait().map_err(SdkError::Io)
174    }
175}
176
177/// Background task that drains stderr line by line, logging each line,
178/// and returns the accumulated output.
179async fn drain_stderr(stderr: ChildStderr) -> String {
180    let mut reader = BufReader::new(stderr);
181    let mut buf = String::new();
182    let mut accumulated = String::new();
183    loop {
184        buf.clear();
185        match reader.read_line(&mut buf).await {
186            Ok(0) => break, // EOF
187            Ok(_) => {
188                let line = buf.trim_end();
189                if !line.is_empty() {
190                    warn!(target: "codex_stderr", "{}", line);
191                    accumulated.push_str(line);
192                    accumulated.push('\n');
193                }
194            }
195            Err(e) => {
196                warn!(target: "codex_stderr", "error reading stderr: {}", e);
197                break;
198            }
199        }
200    }
201    accumulated
202}