Skip to main content

codex_wrapper/
streaming.rs

1//! Streaming execution for `codex exec` commands.
2//!
3//! Instead of buffering all JSONL output and returning it at once,
4//! the streaming API pipes stdout from the child process and delivers
5//! each [`JsonLineEvent`] to a caller-supplied callback as soon as it
6//! arrives.
7//!
8//! # Example
9//!
10//! ```no_run
11//! use codex_wrapper::{Codex, ExecCommand, JsonLineEvent};
12//!
13//! # async fn example() -> codex_wrapper::Result<()> {
14//! let codex = Codex::builder().build()?;
15//! let cmd = ExecCommand::new("what is 2+2?").ephemeral();
16//!
17//! cmd.stream(&codex, |event: JsonLineEvent| {
18//!     println!("{}: {:?}", event.event_type, event.extra);
19//! })
20//! .await?;
21//! # Ok(())
22//! # }
23//! ```
24
25use tokio::io::{AsyncBufReadExt, BufReader};
26use tokio::process::Command;
27use tracing::debug;
28
29use crate::Codex;
30use crate::command::CodexCommand;
31use crate::error::{Error, Result};
32use crate::types::JsonLineEvent;
33
34/// Stream JSONL events from `codex exec <prompt>`, invoking `handler` for each
35/// parsed [`JsonLineEvent`].
36///
37/// The child's stderr is drained concurrently and returned in the error if the
38/// process exits with a non-zero status.
39pub async fn stream_exec<F>(
40    codex: &Codex,
41    cmd: &crate::command::exec::ExecCommand,
42    handler: F,
43) -> Result<()>
44where
45    F: FnMut(JsonLineEvent),
46{
47    let mut args = cmd.args();
48    if !args.contains(&"--json".to_string()) {
49        args.push("--json".into());
50    }
51    run_streaming(codex, args, handler).await
52}
53
54/// Stream JSONL events from `codex exec resume`, invoking `handler` for each
55/// parsed [`JsonLineEvent`].
56pub async fn stream_exec_resume<F>(
57    codex: &Codex,
58    cmd: &crate::command::exec::ExecResumeCommand,
59    handler: F,
60) -> Result<()>
61where
62    F: FnMut(JsonLineEvent),
63{
64    let mut args = cmd.args();
65    if !args.contains(&"--json".to_string()) {
66        args.push("--json".into());
67    }
68    run_streaming(codex, args, handler).await
69}
70
71/// Core streaming implementation shared by both exec variants.
72async fn run_streaming<F>(codex: &Codex, args: Vec<String>, mut handler: F) -> Result<()>
73where
74    F: FnMut(JsonLineEvent),
75{
76    let mut command_args = Vec::new();
77    command_args.extend(codex.global_args.clone());
78    command_args.extend(args);
79
80    debug!(binary = %codex.binary.display(), args = ?command_args, "streaming codex command");
81
82    let mut child_cmd = Command::new(&codex.binary);
83    child_cmd.args(&command_args);
84    child_cmd.stdin(std::process::Stdio::null());
85    child_cmd.stdout(std::process::Stdio::piped());
86    child_cmd.stderr(std::process::Stdio::piped());
87
88    if let Some(dir) = &codex.working_dir {
89        child_cmd.current_dir(dir);
90    }
91    for (key, value) in &codex.env {
92        child_cmd.env(key, value);
93    }
94
95    let mut child = child_cmd.spawn().map_err(|e| Error::Io {
96        message: format!("failed to spawn codex: {e}"),
97        source: e,
98        working_dir: codex.working_dir.clone(),
99    })?;
100
101    let stdout = child.stdout.take().expect("stdout was configured as piped");
102    let stderr = child.stderr.take().expect("stderr was configured as piped");
103
104    let stdout_task = async {
105        let reader = BufReader::new(stdout);
106        let mut lines = reader.lines();
107        let mut events = Vec::new();
108        while let Some(line) = lines.next_line().await.map_err(|e| Error::Io {
109            message: format!("failed to read stdout line: {e}"),
110            source: e,
111            working_dir: codex.working_dir.clone(),
112        })? {
113            if line.trim_start().starts_with('{') {
114                match serde_json::from_str::<JsonLineEvent>(&line) {
115                    Ok(event) => events.push(event),
116                    Err(source) => {
117                        return Err(Error::Json {
118                            message: format!("failed to parse JSONL event: {line}"),
119                            source,
120                        });
121                    }
122                }
123            }
124        }
125        Ok::<Vec<JsonLineEvent>, Error>(events)
126    };
127
128    let stderr_task = async {
129        let reader = BufReader::new(stderr);
130        let mut lines = reader.lines();
131        let mut collected = String::new();
132        while let Some(line) = lines.next_line().await.map_err(|e| Error::Io {
133            message: format!("failed to read stderr line: {e}"),
134            source: e,
135            working_dir: codex.working_dir.clone(),
136        })? {
137            if !collected.is_empty() {
138                collected.push('\n');
139            }
140            collected.push_str(&line);
141        }
142        Ok::<String, Error>(collected)
143    };
144
145    let stream_future = async {
146        let (events_result, stderr_result) = tokio::join!(stdout_task, stderr_task);
147        let events = events_result?;
148        let stderr_output = stderr_result?;
149
150        for event in events {
151            handler(event);
152        }
153
154        let status = child.wait().await.map_err(|e| Error::Io {
155            message: format!("failed to wait on codex process: {e}"),
156            source: e,
157            working_dir: codex.working_dir.clone(),
158        })?;
159
160        let exit_code = status.code().unwrap_or(-1);
161        if !status.success() {
162            return Err(Error::CommandFailed {
163                command: format!("{} {}", codex.binary.display(), command_args.join(" ")),
164                exit_code,
165                stdout: String::new(),
166                stderr: stderr_output,
167                working_dir: codex.working_dir.clone(),
168            });
169        }
170
171        Ok(())
172    };
173
174    if let Some(timeout) = codex.timeout {
175        tokio::time::timeout(timeout, stream_future)
176            .await
177            .map_err(|_| Error::Timeout {
178                timeout_seconds: timeout.as_secs(),
179            })?
180    } else {
181        stream_future.await
182    }
183}
184
185#[cfg(all(test, unix))]
186mod tests {
187    use super::*;
188    use std::sync::{Arc, Mutex};
189
190    /// Build a [`Codex`] client that uses `bash` to run the fake-codex script.
191    fn fake_codex(script_name: &str) -> Codex {
192        let script = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
193            .join("tests")
194            .join(script_name);
195        Codex::builder()
196            .binary("/bin/bash")
197            .arg(script.to_str().unwrap())
198            .build()
199            .expect("bash must exist")
200    }
201
202    #[tokio::test]
203    async fn stream_exec_delivers_events() {
204        let codex = fake_codex("fake-codex.sh");
205        let cmd = crate::command::exec::ExecCommand::new("test prompt").json();
206        let events = Arc::new(Mutex::new(Vec::new()));
207        let events_clone = Arc::clone(&events);
208
209        stream_exec(&codex, &cmd, move |event| {
210            events_clone.lock().unwrap().push(event);
211        })
212        .await
213        .unwrap();
214
215        let events = events.lock().unwrap();
216        assert!(!events.is_empty(), "expected at least one event");
217
218        let types: Vec<&str> = events.iter().map(|e| e.event_type.as_str()).collect();
219        assert!(
220            types.contains(&"thread.started"),
221            "expected thread.started, got: {types:?}"
222        );
223        assert!(
224            types.contains(&"completed"),
225            "expected completed, got: {types:?}"
226        );
227    }
228
229    #[tokio::test]
230    async fn stream_exec_resume_delivers_events() {
231        let codex = fake_codex("fake-codex.sh");
232        let cmd = crate::command::exec::ExecResumeCommand::new().last().json();
233        let events = Arc::new(Mutex::new(Vec::new()));
234        let events_clone = Arc::clone(&events);
235
236        stream_exec_resume(&codex, &cmd, move |event| {
237            events_clone.lock().unwrap().push(event);
238        })
239        .await
240        .unwrap();
241
242        let events = events.lock().unwrap();
243        assert!(!events.is_empty(), "expected at least one event");
244    }
245
246    #[tokio::test]
247    async fn stream_exec_timeout() {
248        let codex = Codex::builder()
249            .binary("/bin/bash")
250            .arg("-c")
251            .arg("sleep 10")
252            .timeout(std::time::Duration::from_millis(50))
253            .build()
254            .unwrap();
255
256        let cmd = crate::command::exec::ExecCommand::new("test").json();
257        let result = stream_exec(&codex, &cmd, |_| {}).await;
258
259        assert!(
260            matches!(result, Err(Error::Timeout { .. })),
261            "expected timeout error, got: {result:?}"
262        );
263    }
264
265    #[tokio::test]
266    async fn stream_exec_parse_error() {
267        let codex = fake_codex("fake-codex-bad-json.sh");
268        let cmd = crate::command::exec::ExecCommand::new("test").json();
269        let result = stream_exec(&codex, &cmd, |_| {}).await;
270
271        assert!(
272            matches!(result, Err(Error::Json { .. })),
273            "expected json parse error, got: {result:?}"
274        );
275    }
276}