apiari_codex_sdk/
transport.rs1use crate::error::{Result, SdkError};
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::{Child, ChildStderr, ChildStdout, Command};
10use tracing::{debug, warn};
11
12pub struct ReadOnlyTransport {
17 child: Child,
18 stdout_reader: BufReader<ChildStdout>,
19 line_buf: String,
21 stderr_task: Option<tokio::task::JoinHandle<String>>,
23}
24
25impl ReadOnlyTransport {
26 pub fn spawn(
35 cli_path: &str,
36 subcommand: &str,
37 extra_args: &[String],
38 prompt: Option<&str>,
39 working_dir: Option<&std::path::Path>,
40 env_vars: &[(String, String)],
41 ) -> Result<Self> {
42 let mut cmd = Command::new(cli_path);
43
44 cmd.arg(subcommand);
46
47 cmd.arg("--json");
49
50 cmd.args(extra_args);
52
53 if let Some(prompt) = prompt {
55 cmd.arg(prompt);
56 }
57
58 cmd.env_remove("CLAUDECODE");
61
62 if let Some(dir) = working_dir {
64 cmd.current_dir(dir);
65 }
66
67 for (key, value) in env_vars {
69 cmd.env(key, value);
70 }
71
72 cmd.stdin(std::process::Stdio::null());
74 cmd.stdout(std::process::Stdio::piped());
75 cmd.stderr(std::process::Stdio::piped());
76
77 let mut child = cmd.spawn().map_err(SdkError::ProcessSpawn)?;
78
79 let stdout = child
80 .stdout
81 .take()
82 .expect("stdout was configured as piped but is None");
83 let stderr = child.stderr.take();
84
85 let stdout_reader = BufReader::new(stdout);
86
87 let stderr_task = stderr.map(|se| tokio::spawn(drain_stderr(se)));
89
90 Ok(Self {
91 child,
92 stdout_reader,
93 line_buf: String::with_capacity(4096),
94 stderr_task,
95 })
96 }
97
98 pub async fn recv(&mut self) -> Result<Option<serde_json::Value>> {
107 loop {
108 self.line_buf.clear();
109 let n = self.stdout_reader.read_line(&mut self.line_buf).await?;
110 if n == 0 {
111 return Ok(None); }
113
114 let line = self.line_buf.trim();
115 if line.is_empty() {
116 continue;
118 }
119
120 debug!(line = %line, "stdout <-");
121
122 return serde_json::from_str(line)
123 .map(Some)
124 .map_err(|e| SdkError::InvalidJson {
125 message: e.to_string(),
126 line: line.to_owned(),
127 source: e,
128 });
129 }
130 }
131
132 #[cfg(unix)]
134 pub fn interrupt(&self) -> Result<()> {
135 if let Some(pid) = self.child.id() {
136 let ret = unsafe { libc::kill(pid as libc::pid_t, libc::SIGINT) };
138 if ret != 0 {
139 return Err(SdkError::Io(std::io::Error::last_os_error()));
140 }
141 }
142 Ok(())
143 }
144
145 #[cfg(not(unix))]
147 pub fn interrupt(&self) -> Result<()> {
148 Err(SdkError::ProtocolError(
149 "interrupt is not supported on this platform".to_owned(),
150 ))
151 }
152
153 pub async fn kill(&mut self) -> Result<()> {
155 self.child.kill().await.map_err(SdkError::Io)
156 }
157
158 pub async fn wait_with_stderr(&mut self) -> Result<(Option<i32>, Option<String>)> {
160 let status = self.child.wait().await?;
161 let stderr = if let Some(task) = self.stderr_task.take() {
162 task.await.ok()
163 } else {
164 None
165 };
166 Ok((status.code(), stderr))
167 }
168
169 pub fn try_wait(&mut self) -> Result<Option<std::process::ExitStatus>> {
171 self.child.try_wait().map_err(SdkError::Io)
172 }
173}
174
175async fn drain_stderr(stderr: ChildStderr) -> String {
178 let mut reader = BufReader::new(stderr);
179 let mut buf = String::new();
180 let mut accumulated = String::new();
181 loop {
182 buf.clear();
183 match reader.read_line(&mut buf).await {
184 Ok(0) => break, Ok(_) => {
186 let line = buf.trim_end();
187 if !line.is_empty() {
188 warn!(target: "codex_stderr", "{}", line);
189 accumulated.push_str(line);
190 accumulated.push('\n');
191 }
192 }
193 Err(e) => {
194 warn!(target: "codex_stderr", "error reading stderr: {}", e);
195 break;
196 }
197 }
198 }
199 accumulated
200}