apiari_codex_sdk/
transport.rs1use crate::error::{Result, SdkError};
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::{Child, ChildStderr, ChildStdout, Command};
10use tracing::{debug, warn};
11
12pub struct ReadOnlyTransport {
17 child: Child,
18 stdout_reader: BufReader<ChildStdout>,
19 line_buf: String,
21 stderr_task: Option<tokio::task::JoinHandle<String>>,
23}
24
25impl ReadOnlyTransport {
26 pub fn spawn(
36 cli_path: &str,
37 subcommand_parts: &[&str],
38 extra_args: &[String],
39 prompt: Option<&str>,
40 working_dir: Option<&std::path::Path>,
41 env_vars: &[(String, String)],
42 ) -> Result<Self> {
43 let mut cmd = Command::new(cli_path);
44
45 cmd.args(subcommand_parts);
47
48 cmd.arg("--json");
50 cmd.arg("--skip-git-repo-check");
51
52 cmd.args(extra_args);
54
55 if let Some(prompt) = prompt {
57 cmd.arg(prompt);
58 }
59
60 cmd.env_remove("CLAUDECODE");
63
64 if let Some(dir) = working_dir {
66 cmd.current_dir(dir);
67 }
68
69 for (key, value) in env_vars {
71 cmd.env(key, value);
72 }
73
74 cmd.stdin(std::process::Stdio::null());
76 cmd.stdout(std::process::Stdio::piped());
77 cmd.stderr(std::process::Stdio::piped());
78
79 let mut child = cmd.spawn().map_err(SdkError::ProcessSpawn)?;
80
81 let stdout = child
82 .stdout
83 .take()
84 .expect("stdout was configured as piped but is None");
85 let stderr = child.stderr.take();
86
87 let stdout_reader = BufReader::new(stdout);
88
89 let stderr_task = stderr.map(|se| tokio::spawn(drain_stderr(se)));
91
92 Ok(Self {
93 child,
94 stdout_reader,
95 line_buf: String::with_capacity(4096),
96 stderr_task,
97 })
98 }
99
100 pub async fn recv(&mut self) -> Result<Option<serde_json::Value>> {
109 loop {
110 self.line_buf.clear();
111 let n = self.stdout_reader.read_line(&mut self.line_buf).await?;
112 if n == 0 {
113 return Ok(None); }
115
116 let line = self.line_buf.trim();
117 if line.is_empty() {
118 continue;
120 }
121
122 debug!(line = %line, "stdout <-");
123
124 return serde_json::from_str(line)
125 .map(Some)
126 .map_err(|e| SdkError::InvalidJson {
127 message: e.to_string(),
128 line: line.to_owned(),
129 source: e,
130 });
131 }
132 }
133
134 #[cfg(unix)]
136 pub fn interrupt(&self) -> Result<()> {
137 if let Some(pid) = self.child.id() {
138 let ret = unsafe { libc::kill(pid as libc::pid_t, libc::SIGINT) };
140 if ret != 0 {
141 return Err(SdkError::Io(std::io::Error::last_os_error()));
142 }
143 }
144 Ok(())
145 }
146
147 #[cfg(not(unix))]
149 pub fn interrupt(&self) -> Result<()> {
150 Err(SdkError::ProtocolError(
151 "interrupt is not supported on this platform".to_owned(),
152 ))
153 }
154
155 pub async fn kill(&mut self) -> Result<()> {
157 self.child.kill().await.map_err(SdkError::Io)
158 }
159
160 pub async fn wait_with_stderr(&mut self) -> Result<(Option<i32>, Option<String>)> {
162 let status = self.child.wait().await?;
163 let stderr = if let Some(task) = self.stderr_task.take() {
164 task.await.ok()
165 } else {
166 None
167 };
168 Ok((status.code(), stderr))
169 }
170
171 pub fn try_wait(&mut self) -> Result<Option<std::process::ExitStatus>> {
173 self.child.try_wait().map_err(SdkError::Io)
174 }
175}
176
177async fn drain_stderr(stderr: ChildStderr) -> String {
180 let mut reader = BufReader::new(stderr);
181 let mut buf = String::new();
182 let mut accumulated = String::new();
183 loop {
184 buf.clear();
185 match reader.read_line(&mut buf).await {
186 Ok(0) => break, Ok(_) => {
188 let line = buf.trim_end();
189 if !line.is_empty() {
190 warn!(target: "codex_stderr", "{}", line);
191 accumulated.push_str(line);
192 accumulated.push('\n');
193 }
194 }
195 Err(e) => {
196 warn!(target: "codex_stderr", "error reading stderr: {}", e);
197 break;
198 }
199 }
200 }
201 accumulated
202}