codex_wrapper/
streaming.rs1use tokio::io::{AsyncBufReadExt, BufReader};
26use tokio::process::Command;
27use tracing::debug;
28
29use crate::Codex;
30use crate::command::CodexCommand;
31use crate::error::{Error, Result};
32use crate::types::JsonLineEvent;
33
34pub async fn stream_exec<F>(
40 codex: &Codex,
41 cmd: &crate::command::exec::ExecCommand,
42 handler: F,
43) -> Result<()>
44where
45 F: FnMut(JsonLineEvent),
46{
47 let mut args = cmd.args();
48 if !args.contains(&"--json".to_string()) {
49 args.push("--json".into());
50 }
51 run_streaming(codex, args, handler).await
52}
53
54pub async fn stream_exec_resume<F>(
57 codex: &Codex,
58 cmd: &crate::command::exec::ExecResumeCommand,
59 handler: F,
60) -> Result<()>
61where
62 F: FnMut(JsonLineEvent),
63{
64 let mut args = cmd.args();
65 if !args.contains(&"--json".to_string()) {
66 args.push("--json".into());
67 }
68 run_streaming(codex, args, handler).await
69}
70
71async fn run_streaming<F>(codex: &Codex, args: Vec<String>, mut handler: F) -> Result<()>
73where
74 F: FnMut(JsonLineEvent),
75{
76 let mut command_args = Vec::new();
77 command_args.extend(codex.global_args.clone());
78 command_args.extend(args);
79
80 debug!(binary = %codex.binary.display(), args = ?command_args, "streaming codex command");
81
82 let mut child_cmd = Command::new(&codex.binary);
83 child_cmd.args(&command_args);
84 child_cmd.stdin(std::process::Stdio::null());
85 child_cmd.stdout(std::process::Stdio::piped());
86 child_cmd.stderr(std::process::Stdio::piped());
87
88 if let Some(dir) = &codex.working_dir {
89 child_cmd.current_dir(dir);
90 }
91 for (key, value) in &codex.env {
92 child_cmd.env(key, value);
93 }
94
95 let mut child = child_cmd.spawn().map_err(|e| Error::Io {
96 message: format!("failed to spawn codex: {e}"),
97 source: e,
98 working_dir: codex.working_dir.clone(),
99 })?;
100
101 let stdout = child.stdout.take().expect("stdout was configured as piped");
102 let stderr = child.stderr.take().expect("stderr was configured as piped");
103
104 let stdout_task = async {
105 let reader = BufReader::new(stdout);
106 let mut lines = reader.lines();
107 let mut events = Vec::new();
108 while let Some(line) = lines.next_line().await.map_err(|e| Error::Io {
109 message: format!("failed to read stdout line: {e}"),
110 source: e,
111 working_dir: codex.working_dir.clone(),
112 })? {
113 if line.trim_start().starts_with('{') {
114 match serde_json::from_str::<JsonLineEvent>(&line) {
115 Ok(event) => events.push(event),
116 Err(source) => {
117 return Err(Error::Json {
118 message: format!("failed to parse JSONL event: {line}"),
119 source,
120 });
121 }
122 }
123 }
124 }
125 Ok::<Vec<JsonLineEvent>, Error>(events)
126 };
127
128 let stderr_task = async {
129 let reader = BufReader::new(stderr);
130 let mut lines = reader.lines();
131 let mut collected = String::new();
132 while let Some(line) = lines.next_line().await.map_err(|e| Error::Io {
133 message: format!("failed to read stderr line: {e}"),
134 source: e,
135 working_dir: codex.working_dir.clone(),
136 })? {
137 if !collected.is_empty() {
138 collected.push('\n');
139 }
140 collected.push_str(&line);
141 }
142 Ok::<String, Error>(collected)
143 };
144
145 let stream_future = async {
146 let (events_result, stderr_result) = tokio::join!(stdout_task, stderr_task);
147 let events = events_result?;
148 let stderr_output = stderr_result?;
149
150 for event in events {
151 handler(event);
152 }
153
154 let status = child.wait().await.map_err(|e| Error::Io {
155 message: format!("failed to wait on codex process: {e}"),
156 source: e,
157 working_dir: codex.working_dir.clone(),
158 })?;
159
160 let exit_code = status.code().unwrap_or(-1);
161 if !status.success() {
162 return Err(Error::CommandFailed {
163 command: format!("{} {}", codex.binary.display(), command_args.join(" ")),
164 exit_code,
165 stdout: String::new(),
166 stderr: stderr_output,
167 working_dir: codex.working_dir.clone(),
168 });
169 }
170
171 Ok(())
172 };
173
174 if let Some(timeout) = codex.timeout {
175 tokio::time::timeout(timeout, stream_future)
176 .await
177 .map_err(|_| Error::Timeout {
178 timeout_seconds: timeout.as_secs(),
179 })?
180 } else {
181 stream_future.await
182 }
183}
184
185#[cfg(all(test, unix))]
186mod tests {
187 use super::*;
188 use std::sync::{Arc, Mutex};
189
190 fn fake_codex(script_name: &str) -> Codex {
192 let script = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
193 .join("tests")
194 .join(script_name);
195 Codex::builder()
196 .binary("/bin/bash")
197 .arg(script.to_str().unwrap())
198 .build()
199 .expect("bash must exist")
200 }
201
202 #[tokio::test]
203 async fn stream_exec_delivers_events() {
204 let codex = fake_codex("fake-codex.sh");
205 let cmd = crate::command::exec::ExecCommand::new("test prompt").json();
206 let events = Arc::new(Mutex::new(Vec::new()));
207 let events_clone = Arc::clone(&events);
208
209 stream_exec(&codex, &cmd, move |event| {
210 events_clone.lock().unwrap().push(event);
211 })
212 .await
213 .unwrap();
214
215 let events = events.lock().unwrap();
216 assert!(!events.is_empty(), "expected at least one event");
217
218 let types: Vec<&str> = events.iter().map(|e| e.event_type.as_str()).collect();
219 assert!(
220 types.contains(&"thread.started"),
221 "expected thread.started, got: {types:?}"
222 );
223 assert!(
224 types.contains(&"completed"),
225 "expected completed, got: {types:?}"
226 );
227 }
228
229 #[tokio::test]
230 async fn stream_exec_resume_delivers_events() {
231 let codex = fake_codex("fake-codex.sh");
232 let cmd = crate::command::exec::ExecResumeCommand::new().last().json();
233 let events = Arc::new(Mutex::new(Vec::new()));
234 let events_clone = Arc::clone(&events);
235
236 stream_exec_resume(&codex, &cmd, move |event| {
237 events_clone.lock().unwrap().push(event);
238 })
239 .await
240 .unwrap();
241
242 let events = events.lock().unwrap();
243 assert!(!events.is_empty(), "expected at least one event");
244 }
245
246 #[tokio::test]
247 async fn stream_exec_timeout() {
248 let codex = Codex::builder()
249 .binary("/bin/bash")
250 .arg("-c")
251 .arg("sleep 10")
252 .timeout(std::time::Duration::from_millis(50))
253 .build()
254 .unwrap();
255
256 let cmd = crate::command::exec::ExecCommand::new("test").json();
257 let result = stream_exec(&codex, &cmd, |_| {}).await;
258
259 assert!(
260 matches!(result, Err(Error::Timeout { .. })),
261 "expected timeout error, got: {result:?}"
262 );
263 }
264
265 #[tokio::test]
266 async fn stream_exec_parse_error() {
267 let codex = fake_codex("fake-codex-bad-json.sh");
268 let cmd = crate::command::exec::ExecCommand::new("test").json();
269 let result = stream_exec(&codex, &cmd, |_| {}).await;
270
271 assert!(
272 matches!(result, Err(Error::Json { .. })),
273 "expected json parse error, got: {result:?}"
274 );
275 }
276}